diff --git a/net/conn/conn.go b/net/conn/conn.go index a6e1db36b74049d7029f54226d43f4649cd1840f..1fae4991f47f97b1bfe60cb21c1f186c1b380384 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -1,4 +1,4 @@ -package swarm +package conn import ( "fmt" @@ -32,6 +32,32 @@ type Conn struct { // Map maps Keys (Peer.IDs) to Connections. type Map map[u.Key]*Conn +// NewConn constructs a new connection +func NewConn(peer *peer.Peer, addr *ma.Multiaddr, nconn net.Conn) (*Conn, error) { + conn := &Conn{ + Peer: peer, + Addr: addr, + Conn: nconn, + } + + if err := conn.newChans(); err != nil { + return nil, err + } + + return conn, nil +} + +// NewNetConn constructs a new connection with given net.Conn +func NewNetConn(nconn net.Conn) (*Conn, error) { + + addr, err := ma.FromNetAddr(nconn.RemoteAddr()) + if err != nil { + return nil, err + } + + return NewConn(new(peer.Peer), addr, nconn) +} + // Dial connects to a particular peer, over a given network // Example: Dial("udp", peer) func Dial(network string, peer *peer.Peer) (*Conn, error) { @@ -50,18 +76,11 @@ func Dial(network string, peer *peer.Peer) (*Conn, error) { return nil, err } - conn := &Conn{ - Peer: peer, - Addr: addr, - Conn: nconn, - } - - newConnChans(conn) - return conn, nil + return NewConn(peer, addr, nconn) } // Construct new channels for given Conn. -func newConnChans(c *Conn) error { +func (c *Conn) newChans() error { if c.Outgoing != nil || c.Incoming != nil { return fmt.Errorf("Conn already initialized") } @@ -77,18 +96,18 @@ func newConnChans(c *Conn) error { } // Close closes the connection, and associated channels. -func (s *Conn) Close() error { +func (c *Conn) Close() error { u.DOut("Closing Conn.\n") - if s.Conn == nil { + if c.Conn == nil { return fmt.Errorf("Already closed") // already closed } // closing net connection - err := s.Conn.Close() - s.Conn = nil + err := c.Conn.Close() + c.Conn = nil // closing channels - s.Incoming.Close() - s.Outgoing.Close() - s.Closed <- true + c.Incoming.Close() + c.Outgoing.Close() + c.Closed <- true return err } diff --git a/net/conn/conn_test.go b/net/conn/conn_test.go index 47b03475c01c4b5b1f25694ef2516eb29dde1547..219004be8fbd0a2796d4c910c35431ea4c51b12c 100644 --- a/net/conn/conn_test.go +++ b/net/conn/conn_test.go @@ -1,4 +1,4 @@ -package swarm +package conn import ( "net" diff --git a/net/swarm/conn.go b/net/swarm/conn.go new file mode 100644 index 0000000000000000000000000000000000000000..655a46c99159f67d055ed29d8b02d45265e8a9b9 --- /dev/null +++ b/net/swarm/conn.go @@ -0,0 +1,69 @@ +package swarm + +import ( + "errors" + "fmt" + "net" + + ident "github.com/jbenet/go-ipfs/identify" + conn "github.com/jbenet/go-ipfs/net/conn" + + u "github.com/jbenet/go-ipfs/util" +) + +// Handle getting ID from this peer, handshake, and adding it into the map +func (s *Swarm) handleIncomingConn(nconn net.Conn) { + + c, err := conn.NewNetConn(nconn) + if err != nil { + s.errChan <- err + return + } + + //TODO(jbenet) the peer might potentially already be in the global PeerBook. + // maybe use the handshake to populate peer. + c.Peer.AddAddress(c.Addr) + + // Setup the new connection + err = s.connSetup(c) + if err != nil && err != ErrAlreadyOpen { + s.errChan <- err + c.Close() + } +} + +// connSetup adds the passed in connection to its peerMap and starts +// the fanIn routine for that connection +func (s *Swarm) connSetup(c *conn.Conn) error { + if c == nil { + return errors.New("Tried to start nil connection.") + } + + u.DOut("Starting connection: %s\n", c.Peer.Key().Pretty()) + + // handshake + if err := s.connHandshake(c); err != nil { + return fmt.Errorf("Conn handshake error: %v", err) + } + + // add to conns + s.connsLock.Lock() + if _, ok := s.conns[c.Peer.Key()]; ok { + s.connsLock.Unlock() + return ErrAlreadyOpen + } + s.conns[c.Peer.Key()] = c + s.connsLock.Unlock() + + // kick off reader goroutine + go s.fanIn(c) + return nil +} + +// connHandshake runs the handshake with the remote connection. +func (s *Swarm) connHandshake(c *conn.Conn) error { + + //TODO(jbenet) this Handshake stuff should be moved elsewhere. + // needs cleanup. needs context. use msg.Pipe. + return ident.Handshake(s.local, c.Peer, c.Incoming.MsgChan, c.Outgoing.MsgChan) +} diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go new file mode 100644 index 0000000000000000000000000000000000000000..104d51a2f965e8e222d7dfbb7b07aa5f8f06c64e --- /dev/null +++ b/net/swarm/swarm.go @@ -0,0 +1,335 @@ +package swarm + +import ( + "errors" + "fmt" + "net" + "sync" + + conn "github.com/jbenet/go-ipfs/net/conn" + msg "github.com/jbenet/go-ipfs/net/message" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) + +// ErrAlreadyOpen signals that a connection to a peer is already open. +var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.") + +// ListenErr contains a set of errors mapping to each of the swarms addresses. +// Used to return multiple errors, as in listen. +type ListenErr struct { + Errors []error +} + +func (e *ListenErr) Error() string { + if e == nil { + return "<nil error>" + } + var out string + for i, v := range e.Errors { + if v != nil { + out += fmt.Sprintf("%d: %s\n", i, v) + } + } + return out +} + +// Swarm is a connection muxer, allowing connections to other peers to +// be opened and closed, while still using the same Chan for all +// communication. The Chan sends/receives Messages, which note the +// destination or source Peer. +type Swarm struct { + + // local is the peer this swarm represents + local *peer.Peer + + // Swarm includes a Pipe object. + *msg.Pipe + + // errChan is the channel of errors. + errChan chan error + + // conns are the open connections the swarm is handling. + conns conn.Map + connsLock sync.RWMutex + + // listeners for each network address + listeners []net.Listener + + // cancel is an internal function used to stop the Swarm's processing. + cancel context.CancelFunc + ctx context.Context +} + +// NewSwarm constructs a Swarm, with a Chan. +func NewSwarm(ctx context.Context, local *peer.Peer) (*Swarm, error) { + s := &Swarm{ + Pipe: msg.NewPipe(10), + conns: conn.Map{}, + local: local, + errChan: make(chan error, 100), + } + + s.ctx, s.cancel = context.WithCancel(ctx) + go s.fanOut() + return s, s.listen() +} + +// Open listeners for each network the swarm should listen on +func (s *Swarm) listen() error { + hasErr := false + retErr := &ListenErr{ + Errors: make([]error, len(s.local.Addresses)), + } + + // listen on every address + for i, addr := range s.local.Addresses { + err := s.connListen(addr) + if err != nil { + hasErr = true + retErr.Errors[i] = err + u.PErr("Failed to listen on: %s [%s]", addr, err) + } + } + + if hasErr { + return retErr + } + return nil +} + +// Listen for new connections on the given multiaddr +func (s *Swarm) connListen(maddr *ma.Multiaddr) error { + netstr, addr, err := maddr.DialArgs() + if err != nil { + return err + } + + list, err := net.Listen(netstr, addr) + if err != nil { + return err + } + + // NOTE: this may require a lock around it later. currently, only run on setup + s.listeners = append(s.listeners, list) + + // Accept and handle new connections on this listener until it errors + go func() { + for { + nconn, err := list.Accept() + if err != nil { + e := fmt.Errorf("Failed to accept connection: %s - %s [%s]", + netstr, addr, err) + s.errChan <- e + + // if cancel is nil, we're closed. + if s.cancel == nil { + return + } + } else { + go s.handleIncomingConn(nconn) + } + } + }() + + return nil +} + +// Close stops a swarm. +func (s *Swarm) Close() error { + if s.cancel == nil { + return errors.New("Swarm already closed.") + } + + // issue cancel for the context + s.cancel() + + // set cancel to nil to prevent calling Close again, and signal to Listeners + s.cancel = nil + + // close listeners + for _, list := range s.listeners { + list.Close() + } + return nil +} + +// Dial connects to a peer. +// +// The idea is that the client of Swarm does not need to know what network +// the connection will happen over. Swarm can use whichever it choses. +// This allows us to use various transport protocols, do NAT traversal/relay, +// etc. to achive connection. +// +// For now, Dial uses only TCP. This will be extended. +func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) { + if peer.ID.Equal(s.local.ID) { + return nil, errors.New("Attempted connection to self!") + } + + k := peer.Key() + + // check if we already have an open connection first + s.connsLock.RLock() + c, found := s.conns[k] + s.connsLock.RUnlock() + if found { + return c, nil + } + + // open connection to peer + c, err := conn.Dial("tcp", peer) + if err != nil { + return nil, err + } + + if err := s.connSetup(c); err != nil { + c.Close() + return nil, err + } + + return c, nil +} + +// DialAddr is for connecting to a peer when you know their addr but not their ID. +// Should only be used when sure that not connected to peer in question +// TODO(jbenet) merge with Dial? need way to patch back. +func (s *Swarm) DialAddr(addr *ma.Multiaddr) (*conn.Conn, error) { + if addr == nil { + return nil, errors.New("addr must be a non-nil Multiaddr") + } + + npeer := new(peer.Peer) + npeer.AddAddress(addr) + + c, err := conn.Dial("tcp", npeer) + if err != nil { + return nil, err + } + + if err := s.connSetup(c); err != nil { + c.Close() + return nil, err + } + + return c, err +} + +// Handles the unwrapping + sending of messages to the right connection. +func (s *Swarm) fanOut() { + for { + select { + case <-s.ctx.Done(): + return // told to close. + + case msg, ok := <-s.Outgoing: + if !ok { + return + } + + s.connsLock.RLock() + conn, found := s.conns[msg.Peer.Key()] + s.connsLock.RUnlock() + + if !found { + e := fmt.Errorf("Sent msg to peer without open conn: %v", + msg.Peer) + s.errChan <- e + continue + } + + // queue it in the connection's buffer + conn.Outgoing.MsgChan <- msg.Data + } + } +} + +// Handles the receiving + wrapping of messages, per conn. +// Consider using reflect.Select with one goroutine instead of n. +func (s *Swarm) fanIn(c *conn.Conn) { + for { + select { + case <-s.ctx.Done(): + // close Conn. + c.Close() + goto out + + case <-c.Closed: + goto out + + case data, ok := <-c.Incoming.MsgChan: + if !ok { + e := fmt.Errorf("Error retrieving from conn: %v", c.Peer.Key().Pretty()) + s.errChan <- e + goto out + } + + msg := &msg.Message{Peer: c.Peer, Data: data} + s.Incoming <- msg + } + } + +out: + s.connsLock.Lock() + delete(s.conns, c.Peer.Key()) + s.connsLock.Unlock() +} + +// GetPeer returns the peer in the swarm with given key id. +func (s *Swarm) GetPeer(key u.Key) *peer.Peer { + s.connsLock.RLock() + conn, found := s.conns[key] + s.connsLock.RUnlock() + + if !found { + return nil + } + return conn.Peer +} + +// GetConnection will check if we are already connected to the peer in question +// and only open a new connection if we arent already +func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) { + p := &peer.Peer{ + ID: id, + Addresses: []*ma.Multiaddr{addr}, + } + + c, err := s.Dial(p) + if err != nil { + return nil, err + } + + return c.Peer, nil +} + +// CloseConnection removes a given peer from swarm + closes the connection +func (s *Swarm) CloseConnection(p *peer.Peer) error { + s.connsLock.RLock() + conn, found := s.conns[u.Key(p.ID)] + s.connsLock.RUnlock() + if !found { + return u.ErrNotFound + } + + s.connsLock.Lock() + delete(s.conns, u.Key(p.ID)) + s.connsLock.Unlock() + + return conn.Close() +} + +func (s *Swarm) Error(e error) { + s.errChan <- e +} + +// GetErrChan returns the errors chan. +func (s *Swarm) GetErrChan() chan error { + return s.errChan +} + +// Temporary to ensure that the Swarm always matches the Network interface as we are changing it +// var _ Network = &Swarm{} diff --git a/net/swarm/swarm_test.go b/net/swarm/swarm_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2c9417e433ce9da48e866588a7932a2da4ee1bec --- /dev/null +++ b/net/swarm/swarm_test.go @@ -0,0 +1,154 @@ +package swarm + +import ( + "fmt" + "net" + "testing" + + msg "github.com/jbenet/go-ipfs/net/message" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" +) + +func pingListen(t *testing.T, listener *net.TCPListener, peer *peer.Peer) { + for { + c, err := listener.Accept() + if err == nil { + fmt.Println("accepted") + go pong(t, c, peer) + } + } +} + +func pong(t *testing.T, c net.Conn, peer *peer.Peer) { + mrw := msgio.NewReadWriter(c) + for { + data := make([]byte, 1024) + n, err := mrw.ReadMsg(data) + if err != nil { + fmt.Printf("error %v\n", err) + return + } + d := string(data[:n]) + if d != "ping" { + t.Errorf("error: didn't receive ping: '%v'\n", d) + return + } + err = mrw.WriteMsg([]byte("pong")) + if err != nil { + fmt.Printf("error %v\n", err) + return + } + } +} + +func setupPeer(id string, addr string) (*peer.Peer, error) { + tcp, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + + mh, err := mh.FromHexString(id) + if err != nil { + return nil, err + } + + p := &peer.Peer{ID: peer.ID(mh)} + p.AddAddress(tcp) + return p, nil +} + +func TestSwarm(t *testing.T) { + + local, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30", + "/ip4/127.0.0.1/tcp/1234") + if err != nil { + t.Fatal("error setting up peer", err) + } + + swarm, err := NewSwarm(context.Background(), local) + if err != nil { + t.Error(err) + } + var peers []*peer.Peer + var listeners []net.Listener + peerNames := map[string]string{ + "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30": "/ip4/127.0.0.1/tcp/1234", + "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31": "/ip4/127.0.0.1/tcp/2345", + "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32": "/ip4/127.0.0.1/tcp/3456", + "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33": "/ip4/127.0.0.1/tcp/4567", + } + + for k, n := range peerNames { + peer, err := setupPeer(k, n) + if err != nil { + t.Fatal("error setting up peer", err) + } + a := peer.NetAddress("tcp") + if a == nil { + t.Fatal("error setting up peer (addr is nil)", peer) + } + n, h, err := a.DialArgs() + if err != nil { + t.Fatal("error getting dial args from addr") + } + listener, err := net.Listen(n, h) + if err != nil { + t.Fatal("error setting up listener", err) + } + go pingListen(t, listener.(*net.TCPListener), peer) + + u.POut("wat?\n") + _, err = swarm.Dial(peer) + if err != nil { + t.Fatal("error swarm dialing to peer", err) + } + + u.POut("wut?\n") + // ok done, add it. + peers = append(peers, peer) + listeners = append(listeners, listener) + } + + MsgNum := 1000 + for k := 0; k < MsgNum; k++ { + for _, p := range peers { + swarm.Outgoing <- &msg.Message{Peer: p, Data: []byte("ping")} + u.POut("sending ping to %v\n", p) + } + } + + got := map[u.Key]int{} + + for k := 0; k < (MsgNum * len(peers)); k++ { + u.POut("listening for ping...") + msg := <-swarm.Incoming + if string(msg.Data) != "pong" { + t.Error("unexpected conn output", msg.Data) + } + + n, _ := got[msg.Peer.Key()] + got[msg.Peer.Key()] = n + 1 + } + + if len(peers) != len(got) { + t.Error("got less messages than sent") + } + + for p, n := range got { + if n != MsgNum { + t.Error("peer did not get all msgs", p, n, "/", MsgNum) + } + } + + fmt.Println("closing") + swarm.Close() + for _, listener := range listeners { + listener.(*net.TCPListener).Close() + } +}