From d6e8e55f00a6b9986b1688c19f7a3fb48b2ec05a Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet <juan@benet.ai> Date: Sun, 14 Sep 2014 01:03:55 -0700 Subject: [PATCH] rmv old swarm --- swarm/interface.go | 20 -- swarm/mes_listener.go | 123 --------- swarm/mes_listener_test.go | 32 --- swarm/mes_wrapper.pb.go | 85 ------- swarm/mes_wrapper.proto | 12 - swarm/swarm.go | 507 ------------------------------------- swarm/swarm_test.go | 136 ---------- swarm/wrapper.go | 24 -- 8 files changed, 939 deletions(-) delete mode 100644 swarm/interface.go delete mode 100644 swarm/mes_listener.go delete mode 100644 swarm/mes_listener_test.go delete mode 100644 swarm/mes_wrapper.pb.go delete mode 100644 swarm/mes_wrapper.proto delete mode 100644 swarm/swarm.go delete mode 100644 swarm/swarm_test.go delete mode 100644 swarm/wrapper.go diff --git a/swarm/interface.go b/swarm/interface.go deleted file mode 100644 index 3d506df7..00000000 --- a/swarm/interface.go +++ /dev/null @@ -1,20 +0,0 @@ -package swarm - -import ( - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" - - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" -) - -type Network interface { - GetPeer(u.Key) *peer.Peer - Listen() error - ConnectNew(*ma.Multiaddr) (*peer.Peer, error) - GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) - Error(error) - GetErrChan() chan error - GetChannel(PBWrapper_MessageType) *Chan - Close() - CloseConnection(*peer.Peer) error -} diff --git a/swarm/mes_listener.go b/swarm/mes_listener.go deleted file mode 100644 index 97cabe81..00000000 --- a/swarm/mes_listener.go +++ /dev/null @@ -1,123 +0,0 @@ -package swarm - -import ( - crand "crypto/rand" - "sync" - "time" - - u "github.com/jbenet/go-ipfs/util" -) - -type MessageListener struct { - listeners map[string]*listenInfo - haltchan chan struct{} - unlist chan string - nlist chan *listenInfo - send chan *respMes -} - -// GenerateMessageID creates and returns a new message ID -func GenerateMessageID() string { - buf := make([]byte, 16) - crand.Read(buf) - return string(buf) -} - -// The listen info struct holds information about a message that is being waited for -type listenInfo struct { - // Responses matching the listen ID will be sent through resp - resp chan *Message - - // count is the number of responses to listen for - count int - - // eol is the time at which this listener will expire - eol time.Time - - // sendlock is used to prevent conditions where we try to send on the resp - // channel as its being closed by a timeout in another thread - sendLock sync.Mutex - - closed bool - - id string -} - -func NewMessageListener() *MessageListener { - ml := new(MessageListener) - ml.haltchan = make(chan struct{}) - ml.listeners = make(map[string]*listenInfo) - ml.nlist = make(chan *listenInfo, 16) - ml.send = make(chan *respMes, 16) - ml.unlist = make(chan string, 16) - go ml.run() - return ml -} - -func (ml *MessageListener) Listen(id string, count int, timeout time.Duration) <-chan *Message { - li := new(listenInfo) - li.count = count - li.eol = time.Now().Add(timeout) - li.resp = make(chan *Message, count) - li.id = id - ml.nlist <- li - return li.resp -} - -func (ml *MessageListener) Unlisten(id string) { - ml.unlist <- id -} - -type respMes struct { - id string - mes *Message -} - -func (ml *MessageListener) Respond(id string, mes *Message) { - ml.send <- &respMes{ - id: id, - mes: mes, - } -} - -func (ml *MessageListener) Halt() { - ml.haltchan <- struct{}{} -} - -func (ml *MessageListener) run() { - for { - select { - case <-ml.haltchan: - return - case id := <-ml.unlist: - trg, ok := ml.listeners[id] - if !ok { - continue - } - close(trg.resp) - delete(ml.listeners, id) - case li := <-ml.nlist: - ml.listeners[li.id] = li - case s := <-ml.send: - trg, ok := ml.listeners[s.id] - if !ok { - u.DOut("Send with no listener.") - continue - } - - if time.Now().After(trg.eol) { - close(trg.resp) - delete(ml.listeners, s.id) - continue - } - - trg.resp <- s.mes - trg.count-- - - if trg.count == 0 { - close(trg.resp) - delete(ml.listeners, s.id) - } - } - } -} diff --git a/swarm/mes_listener_test.go b/swarm/mes_listener_test.go deleted file mode 100644 index 566011aa..00000000 --- a/swarm/mes_listener_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package swarm - -import ( - "testing" - "time" - - peer "github.com/jbenet/go-ipfs/peer" -) - -// Ensure that the Message Listeners basic functionality works -func TestMessageListener(t *testing.T) { - ml := NewMessageListener() - a := GenerateMessageID() - resp := ml.Listen(a, 1, time.Minute) - - pmes := new(PBWrapper) - pmes.Message = []byte("Hello") - pmes.Type = new(PBWrapper_MessageType) - mes := NewMessage(new(peer.Peer), pmes) - - go ml.Respond(a, mes) - - del := time.After(time.Millisecond * 100) - select { - case get := <-resp: - if string(get.Data) != string(mes.Data) { - t.Fatal("Something got really messed up") - } - case <-del: - t.Fatal("Waiting on message response timed out.") - } -} diff --git a/swarm/mes_wrapper.pb.go b/swarm/mes_wrapper.pb.go deleted file mode 100644 index f218a448..00000000 --- a/swarm/mes_wrapper.pb.go +++ /dev/null @@ -1,85 +0,0 @@ -// Code generated by protoc-gen-go. -// source: mes_wrapper.proto -// DO NOT EDIT! - -/* -Package swarm is a generated protocol buffer package. - -It is generated from these files: - mes_wrapper.proto - -It has these top-level messages: - PBWrapper -*/ -package swarm - -import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = math.Inf - -type PBWrapper_MessageType int32 - -const ( - PBWrapper_TEST PBWrapper_MessageType = 0 - PBWrapper_DHT_MESSAGE PBWrapper_MessageType = 1 - PBWrapper_BITSWAP PBWrapper_MessageType = 2 -) - -var PBWrapper_MessageType_name = map[int32]string{ - 0: "TEST", - 1: "DHT_MESSAGE", - 2: "BITSWAP", -} -var PBWrapper_MessageType_value = map[string]int32{ - "TEST": 0, - "DHT_MESSAGE": 1, - "BITSWAP": 2, -} - -func (x PBWrapper_MessageType) Enum() *PBWrapper_MessageType { - p := new(PBWrapper_MessageType) - *p = x - return p -} -func (x PBWrapper_MessageType) String() string { - return proto.EnumName(PBWrapper_MessageType_name, int32(x)) -} -func (x *PBWrapper_MessageType) UnmarshalJSON(data []byte) error { - value, err := proto.UnmarshalJSONEnum(PBWrapper_MessageType_value, data, "PBWrapper_MessageType") - if err != nil { - return err - } - *x = PBWrapper_MessageType(value) - return nil -} - -type PBWrapper struct { - Type *PBWrapper_MessageType `protobuf:"varint,1,req,enum=swarm.PBWrapper_MessageType" json:"Type,omitempty"` - Message []byte `protobuf:"bytes,2,req" json:"Message,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *PBWrapper) Reset() { *m = PBWrapper{} } -func (m *PBWrapper) String() string { return proto.CompactTextString(m) } -func (*PBWrapper) ProtoMessage() {} - -func (m *PBWrapper) GetType() PBWrapper_MessageType { - if m != nil && m.Type != nil { - return *m.Type - } - return PBWrapper_TEST -} - -func (m *PBWrapper) GetMessage() []byte { - if m != nil { - return m.Message - } - return nil -} - -func init() { - proto.RegisterEnum("swarm.PBWrapper_MessageType", PBWrapper_MessageType_name, PBWrapper_MessageType_value) -} diff --git a/swarm/mes_wrapper.proto b/swarm/mes_wrapper.proto deleted file mode 100644 index ab72232f..00000000 --- a/swarm/mes_wrapper.proto +++ /dev/null @@ -1,12 +0,0 @@ -package swarm; - -message PBWrapper { - enum MessageType { - TEST = 0; - DHT_MESSAGE = 1; - BITSWAP = 2; - } - - required MessageType Type = 1; - required bytes Message = 2; -} diff --git a/swarm/swarm.go b/swarm/swarm.go deleted file mode 100644 index 0a3d13bf..00000000 --- a/swarm/swarm.go +++ /dev/null @@ -1,507 +0,0 @@ -package swarm - -import ( - "errors" - "fmt" - "net" - "sync" - - proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - ident "github.com/jbenet/go-ipfs/identify" - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" -) - -var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.") - -// Message represents a packet of information sent to or received from a -// particular Peer. -type Message struct { - // To or from, depending on direction. - Peer *peer.Peer - - // Opaque data - Data []byte -} - -// Cleaner looking helper function to make a new message struct -func NewMessage(p *peer.Peer, data proto.Message) *Message { - bytes, err := proto.Marshal(data) - if err != nil { - u.PErr("%v\n", err.Error()) - return nil - } - return &Message{ - Peer: p, - Data: bytes, - } -} - -// Chan is a swarm channel, which provides duplex communication and errors. -type Chan struct { - Outgoing chan *Message - Incoming chan *Message - Errors chan error - Close chan bool -} - -// NewChan constructs a Chan instance, with given buffer size bufsize. -func NewChan(bufsize int) *Chan { - return &Chan{ - Outgoing: make(chan *Message, bufsize), - Incoming: make(chan *Message, bufsize), - Errors: make(chan error, bufsize), - Close: make(chan bool, bufsize), - } -} - -// Contains a set of errors mapping to each of the swarms addresses -// that were listened on -type SwarmListenErr struct { - Errors []error -} - -func (se *SwarmListenErr) Error() string { - if se == nil { - return "<nil error>" - } - var out string - for i, v := range se.Errors { - if v != nil { - out += fmt.Sprintf("%d: %s\n", i, v) - } - } - return out -} - -// Swarm is a connection muxer, allowing connections to other peers to -// be opened and closed, while still using the same Chan for all -// communication. The Chan sends/receives Messages, which note the -// destination or source Peer. -type Swarm struct { - Chan *Chan - conns ConnMap - connsLock sync.RWMutex - - filterChans map[PBWrapper_MessageType]*Chan - toFilter chan *Message - newFilters chan *newFilterInfo - - local *peer.Peer - listeners []net.Listener - haltroute chan struct{} -} - -// NewSwarm constructs a Swarm, with a Chan. -func NewSwarm(local *peer.Peer) *Swarm { - s := &Swarm{ - Chan: NewChan(10), - conns: ConnMap{}, - local: local, - filterChans: make(map[PBWrapper_MessageType]*Chan), - toFilter: make(chan *Message, 32), - newFilters: make(chan *newFilterInfo), - haltroute: make(chan struct{}), - } - go s.routeMessages() - go s.fanOut() - return s -} - -// Open listeners for each network the swarm should listen on -func (s *Swarm) Listen() error { - var ret_err *SwarmListenErr - for i, addr := range s.local.Addresses { - err := s.connListen(addr) - if err != nil { - if ret_err == nil { - ret_err = new(SwarmListenErr) - ret_err.Errors = make([]error, len(s.local.Addresses)) - } - ret_err.Errors[i] = err - u.PErr("Failed to listen on: %s [%s]", addr, err) - } - } - if ret_err == nil { - return nil - } - return ret_err -} - -// Listen for new connections on the given multiaddr -func (s *Swarm) connListen(maddr *ma.Multiaddr) error { - netstr, addr, err := maddr.DialArgs() - if err != nil { - return err - } - - list, err := net.Listen(netstr, addr) - if err != nil { - return err - } - - // NOTE: this may require a lock around it later. currently, only run on setup - s.listeners = append(s.listeners, list) - - // Accept and handle new connections on this listener until it errors - go func() { - for { - nconn, err := list.Accept() - if err != nil { - e := fmt.Errorf("Failed to accept connection: %s - %s [%s]", - netstr, addr, err) - go func() { s.Chan.Errors <- e }() - return - } - go s.handleNewConn(nconn) - } - }() - - return nil -} - -// Handle getting ID from this peer and adding it into the map -func (s *Swarm) handleNewConn(nconn net.Conn) { - p := new(peer.Peer) - - conn := &Conn{ - Peer: p, - Addr: nil, - Conn: nconn, - } - newConnChans(conn) - - sin, sout, err := ident.Handshake(s.local, p, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) - if err != nil { - u.PErr("%v\n", err.Error()) - conn.Close() - return - } - - // Get address to contact remote peer from - addr := <-sin - maddr, err := ma.NewMultiaddr(string(addr)) - if err != nil { - u.PErr("Got invalid address from peer.") - s.Error(err) - return - } - p.AddAddress(maddr) - - conn.secIn = sin - conn.secOut = sout - - err = s.StartConn(conn) - if err != nil { - s.Error(err) - } -} - -// Close closes a swarm. -func (s *Swarm) Close() { - s.connsLock.RLock() - l := len(s.conns) - s.connsLock.RUnlock() - - for i := 0; i < l; i++ { - s.Chan.Close <- true // fan ins - } - s.Chan.Close <- true // fan out - s.Chan.Close <- true // listener - - for _, list := range s.listeners { - list.Close() - } - - s.haltroute <- struct{}{} - - for _, filter := range s.filterChans { - filter.Close <- true - } -} - -// Dial connects to a peer. -// -// The idea is that the client of Swarm does not need to know what network -// the connection will happen over. Swarm can use whichever it choses. -// This allows us to use various transport protocols, do NAT traversal/relay, -// etc. to achive connection. -// -// For now, Dial uses only TCP. This will be extended. -func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error, bool) { - k := peer.Key() - - // check if we already have an open connection first - s.connsLock.RLock() - conn, found := s.conns[k] - s.connsLock.RUnlock() - if found { - return conn, nil, true - } - - // open connection to peer - conn, err := Dial("tcp", peer) - if err != nil { - return nil, err, false - } - - return conn, nil, false -} - -// StartConn adds the passed in connection to its peerMap and starts -// the fanIn routine for that connection -func (s *Swarm) StartConn(conn *Conn) error { - if conn == nil { - return errors.New("Tried to start nil connection.") - } - - u.DOut("Starting connection: %s\n", conn.Peer.Key().Pretty()) - // add to conns - s.connsLock.Lock() - if _, ok := s.conns[conn.Peer.Key()]; ok { - s.connsLock.Unlock() - return ErrAlreadyOpen - } - s.conns[conn.Peer.Key()] = conn - s.connsLock.Unlock() - - // kick off reader goroutine - go s.fanIn(conn) - return nil -} - -// Handles the unwrapping + sending of messages to the right connection. -func (s *Swarm) fanOut() { - for { - select { - case <-s.Chan.Close: - return // told to close. - case msg, ok := <-s.Chan.Outgoing: - if !ok { - return - } - - if len(msg.Data) > MaxMessageSize { - s.Error(fmt.Errorf("Exceeded max message size! (tried to send len = %d)", len(msg.Data))) - } - - s.connsLock.RLock() - conn, found := s.conns[msg.Peer.Key()] - s.connsLock.RUnlock() - - if !found { - e := fmt.Errorf("Sent msg to peer without open conn: %v", - msg.Peer) - s.Chan.Errors <- e - continue - } - - // queue it in the connection's buffer - conn.secOut <- msg.Data - } - } -} - -// Handles the receiving + wrapping of messages, per conn. -// Consider using reflect.Select with one goroutine instead of n. -func (s *Swarm) fanIn(conn *Conn) { - for { - select { - case <-s.Chan.Close: - // close Conn. - conn.Close() - goto out - - case <-conn.Closed: - goto out - - case data, ok := <-conn.secIn: - if !ok { - e := fmt.Errorf("Error retrieving from conn: %v", conn.Peer.Key().Pretty()) - s.Chan.Errors <- e - goto out - } - - msg := &Message{Peer: conn.Peer, Data: data} - s.toFilter <- msg - } - } -out: - - s.connsLock.Lock() - delete(s.conns, conn.Peer.Key()) - s.connsLock.Unlock() -} - -type newFilterInfo struct { - Type PBWrapper_MessageType - resp chan *Chan -} - -func (s *Swarm) routeMessages() { - for { - select { - case mes, ok := <-s.toFilter: - if !ok { - return - } - wrapper, err := Unwrap(mes.Data) - if err != nil { - u.PErr("error in route messages: %s\n", err) - } - - ch, ok := s.filterChans[PBWrapper_MessageType(wrapper.GetType())] - if !ok { - u.PErr("Received message with invalid type: %d\n", wrapper.GetType()) - continue - } - - mes.Data = wrapper.GetMessage() - ch.Incoming <- mes - case gchan := <-s.newFilters: - nch, ok := s.filterChans[gchan.Type] - if !ok { - nch = NewChan(16) - s.filterChans[gchan.Type] = nch - go s.muxChan(nch, gchan.Type) - } - gchan.resp <- nch - case <-s.haltroute: - return - } - } -} - -func (s *Swarm) muxChan(ch *Chan, typ PBWrapper_MessageType) { - for { - select { - case <-ch.Close: - return - case mes := <-ch.Outgoing: - data, err := Wrap(mes.Data, typ) - if err != nil { - u.PErr("muxChan error: %s\n", err) - continue - } - mes.Data = data - s.Chan.Outgoing <- mes - } - } -} - -// GetPeer returns the peer in the swarm with given key id. -func (s *Swarm) GetPeer(key u.Key) *peer.Peer { - s.connsLock.RLock() - defer s.connsLock.RUnlock() - conn, found := s.conns[key] - if !found { - return nil - } - return conn.Peer -} - -// GetConnection will check if we are already connected to the peer in question -// and only open a new connection if we arent already -func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) { - p := &peer.Peer{ - ID: id, - Addresses: []*ma.Multiaddr{addr}, - } - - if id.Equal(s.local.ID) { - return nil, errors.New("Attempted connection to self!") - } - - conn, err, reused := s.Dial(p) - if err != nil { - return nil, err - } - - if reused { - return p, nil - } - - err = s.handleDialedCon(conn) - return conn.Peer, err -} - -// Handle performing a handshake on a new connection and ensuring proper forward communication -func (s *Swarm) handleDialedCon(conn *Conn) error { - sin, sout, err := ident.Handshake(s.local, conn.Peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) - if err != nil { - return err - } - - // Send node an address that you can be reached on - myaddr := s.local.NetAddress("tcp") - mastr, err := myaddr.String() - if err != nil { - return errors.New("No local address to send to peer.") - } - - sout <- []byte(mastr) - - conn.secIn = sin - conn.secOut = sout - - s.StartConn(conn) - - return nil -} - -// ConnectNew is for connecting to a peer when you dont know their ID, -// Should only be used when you are sure that you arent already connected to peer in question -func (s *Swarm) ConnectNew(addr *ma.Multiaddr) (*peer.Peer, error) { - if addr == nil { - return nil, errors.New("nil Multiaddr passed to swarm.Connect()") - } - npeer := new(peer.Peer) - npeer.AddAddress(addr) - - conn, err := Dial("tcp", npeer) - if err != nil { - return nil, err - } - - err = s.handleDialedCon(conn) - return npeer, err -} - -// CloseConnection removes a given peer from swarm + closes the connection -func (s *Swarm) CloseConnection(p *peer.Peer) error { - u.DOut("Dropping peer: [%s]\n", p.ID.Pretty()) - s.connsLock.RLock() - conn, found := s.conns[u.Key(p.ID)] - s.connsLock.RUnlock() - if !found { - return u.ErrNotFound - } - - s.connsLock.Lock() - delete(s.conns, u.Key(p.ID)) - s.connsLock.Unlock() - - return conn.Close() -} - -func (s *Swarm) Error(e error) { - s.Chan.Errors <- e -} - -func (s *Swarm) GetErrChan() chan error { - return s.Chan.Errors -} - -func (s *Swarm) GetChannel(typ PBWrapper_MessageType) *Chan { - nfi := &newFilterInfo{ - Type: typ, - resp: make(chan *Chan), - } - s.newFilters <- nfi - - return <-nfi.resp -} - -// Temporary to ensure that the Swarm always matches the Network interface as we are changing it -var _ Network = &Swarm{} diff --git a/swarm/swarm_test.go b/swarm/swarm_test.go deleted file mode 100644 index e8a7af50..00000000 --- a/swarm/swarm_test.go +++ /dev/null @@ -1,136 +0,0 @@ -package swarm - -import ( - "fmt" - "net" - "testing" - - msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" -) - -func pingListen(listener *net.TCPListener, peer *peer.Peer) { - for { - c, err := listener.Accept() - if err == nil { - fmt.Println("accepted") - go pong(c, peer) - } - } -} - -func pong(c net.Conn, peer *peer.Peer) { - mrw := msgio.NewReadWriter(c) - for { - data := make([]byte, 1024) - n, err := mrw.ReadMsg(data) - if err != nil { - fmt.Printf("error %v\n", err) - return - } - b, err := Unwrap(data[:n]) - if err != nil { - fmt.Printf("error %v\n", err) - return - } - if string(b.GetMessage()) != "ping" { - fmt.Printf("error: didn't receive ping: '%v'\n", b.GetMessage()) - return - } - - data, err = Wrap([]byte("pong"), PBWrapper_TEST) - if err != nil { - fmt.Printf("error %v\n", err) - return - } - err = mrw.WriteMsg(data) - if err != nil { - fmt.Printf("error %v\n", err) - return - } - } -} - -func TestSwarm(t *testing.T) { - - swarm := NewSwarm(nil) - var peers []*peer.Peer - var listeners []net.Listener - peerNames := map[string]string{ - "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30": "/ip4/127.0.0.1/tcp/1234", - "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31": "/ip4/127.0.0.1/tcp/2345", - "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32": "/ip4/127.0.0.1/tcp/3456", - "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33": "/ip4/127.0.0.1/tcp/4567", - } - - recv := swarm.GetChannel(PBWrapper_TEST) - for k, n := range peerNames { - peer, err := setupPeer(k, n) - if err != nil { - t.Fatal("error setting up peer", err) - } - a := peer.NetAddress("tcp") - if a == nil { - t.Fatal("error setting up peer (addr is nil)", peer) - } - n, h, err := a.DialArgs() - if err != nil { - t.Fatal("error getting dial args from addr") - } - listener, err := net.Listen(n, h) - if err != nil { - t.Fatal("error setting up listener", err) - } - go pingListen(listener.(*net.TCPListener), peer) - - conn, err, _ := swarm.Dial(peer) - if err != nil { - t.Fatal("error swarm dialing to peer", err) - } - - //Since we arent doing a handshake, set up 'secure' channels - conn.secIn = conn.Incoming.MsgChan - conn.secOut = conn.Outgoing.MsgChan - - swarm.StartConn(conn) - // ok done, add it. - peers = append(peers, peer) - listeners = append(listeners, listener) - } - - MsgNum := 1000 - for k := 0; k < MsgNum; k++ { - for _, p := range peers { - recv.Outgoing <- &Message{Peer: p, Data: []byte("ping")} - } - } - - got := map[u.Key]int{} - - for k := 0; k < (MsgNum * len(peers)); k++ { - msg := <-recv.Incoming - if string(msg.Data) != "pong" { - t.Error("unexpected conn output", msg.Data) - } - - n, _ := got[msg.Peer.Key()] - got[msg.Peer.Key()] = n + 1 - } - - if len(peers) != len(got) { - t.Error("got less messages than sent") - } - - for p, n := range got { - if n != MsgNum { - t.Error("peer did not get all msgs", p, n, "/", MsgNum) - } - } - - fmt.Println("closing") - swarm.Close() - for _, listener := range listeners { - listener.(*net.TCPListener).Close() - } -} diff --git a/swarm/wrapper.go b/swarm/wrapper.go deleted file mode 100644 index 469620e8..00000000 --- a/swarm/wrapper.go +++ /dev/null @@ -1,24 +0,0 @@ -package swarm - -import "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" - -func Wrap(data []byte, typ PBWrapper_MessageType) ([]byte, error) { - wrapper := new(PBWrapper) - wrapper.Message = data - wrapper.Type = &typ - b, err := proto.Marshal(wrapper) - if err != nil { - return nil, err - } - return b, nil -} - -func Unwrap(data []byte) (*PBWrapper, error) { - mes := new(PBWrapper) - err := proto.Unmarshal(data, mes) - if err != nil { - return nil, err - } - - return mes, nil -} -- GitLab