diff --git a/crypto/key.go b/crypto/key.go index 4b40feb6dd9036991d0057befffde0a18dd2bc4e..b26d231ea9b43c818e39ab30f710b2b9ebf0d2af 100644 --- a/crypto/key.go +++ b/crypto/key.go @@ -99,7 +99,7 @@ func GenerateEKeyPair(curveName string) ([]byte, GenSharedKey, error) { } pubKey := elliptic.Marshal(curve, x, y) - log.Debug("GenerateEKeyPair %d", len(pubKey)) + // log.Debug("GenerateEKeyPair %d", len(pubKey)) done := func(theirPub []byte) ([]byte, error) { // Verify and unpack node's public key. diff --git a/net/conn/multiconn.go b/net/conn/multiconn.go index 1c4533d72fd64cd4601482ee013e33c1da35d84c..59c52b7f939df193fd113b3b33cd38141c1aa394 100644 --- a/net/conn/multiconn.go +++ b/net/conn/multiconn.go @@ -38,7 +38,7 @@ type MultiConn struct { } // NewMultiConn constructs a new connection -func NewMultiConn(ctx context.Context, local, remote *peer.Peer, conns []Conn) (Conn, error) { +func NewMultiConn(ctx context.Context, local, remote *peer.Peer, conns []Conn) (*MultiConn, error) { c := &MultiConn{ local: local, @@ -53,13 +53,10 @@ func NewMultiConn(ctx context.Context, local, remote *peer.Peer, conns []Conn) ( // must happen before Adds / fanOut c.ContextCloser = NewContextCloser(ctx, c.close) - log.Info("adding %d...", len(conns)) if conns != nil && len(conns) > 0 { c.Add(conns...) } go c.fanOut() - - log.Info("newMultiConn: %v to %v", local, remote) return c, nil } @@ -72,6 +69,9 @@ func (c *MultiConn) Add(conns ...Conn) { log.Info("MultiConn: adding %s", c2) if c.LocalPeer() != c2.LocalPeer() || c.RemotePeer() != c2.RemotePeer() { log.Error("%s", c2) + c.Unlock() // ok to unlock (to log). panicing. + log.Error("%s", c) + c.Lock() // gotta relock to avoid lock panic from deferring. panic("connection addresses mismatch") } @@ -102,12 +102,12 @@ func (c *MultiConn) Remove(conns ...Conn) { } // close all in parallel, but wait for all to be done closing. - CloseConns(conns) + CloseConns(conns...) } // CloseConns closes multiple connections in parallel, and waits for all // to finish closing. -func CloseConns(conns []Conn) { +func CloseConns(conns ...Conn) { var wg sync.WaitGroup for _, child := range conns { @@ -204,7 +204,7 @@ func (c *MultiConn) close() error { c.RUnlock() // close underlying connections - CloseConns(conns) + CloseConns(conns...) return nil } diff --git a/net/conn/multiconn_test.go b/net/conn/multiconn_test.go index bc0b59bf24041d0e596fc83fb99e52171e547691..bb8404a135b399a20310983e2dbf5e17df83da06 100644 --- a/net/conn/multiconn_test.go +++ b/net/conn/multiconn_test.go @@ -150,7 +150,7 @@ func setupMultiConns(t *testing.T, ctx context.Context) (a, b *MultiConn) { p2l.Close() log.Info("did you make multiconns?") - return c1.(*MultiConn), c2.(*MultiConn) + return c1, c2 } func TestMulticonnSend(t *testing.T) { diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 0aeabb273011c78cafd5fcb71e5f2006c7e6c58c..629e946a7b27abf332a0b85b58d5470b8fc1489a 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -36,7 +36,7 @@ func (s *Swarm) listen() error { // Listen for new connections on the given multiaddr func (s *Swarm) connListen(maddr ma.Multiaddr) error { - list, err := conn.Listen(s.ctx, maddr, s.local, s.peers) + list, err := conn.Listen(s.Context(), maddr, s.local, s.peers) if err != nil { return err } @@ -50,13 +50,19 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error { s.listeners = append(s.listeners, list) // Accept and handle new connections on this listener until it errors + // this listener is a child. + s.Children().Add(1) go func() { + defer s.Children().Done() + for { select { - case <-s.ctx.Done(): + case <-s.Closing(): return case conn := <-list.Accept(): + // handler also a child. + s.Children().Add(1) go s.handleIncomingConn(conn) } } @@ -67,6 +73,8 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error { // Handle getting ID from this peer, handshake, and adding it into the map func (s *Swarm) handleIncomingConn(nconn conn.Conn) { + // this handler is a child. added by caller. + defer s.Children().Done() // Setup the new connection _, err := s.connSetup(nconn) @@ -77,7 +85,7 @@ func (s *Swarm) handleIncomingConn(nconn conn.Conn) { } // connSetup adds the passed in connection to its peerMap and starts -// the fanIn routine for that connection +// the fanInSingle routine for that connection func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { if c == nil { return nil, errors.New("Tried to start nil connection.") @@ -93,28 +101,44 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { // add to conns s.connsLock.Lock() - if c2, ok := s.conns[c.RemotePeer().Key()]; ok { - log.Debug("Conn already open!") + + mc, ok := s.conns[c.RemotePeer().Key()] + if !ok { + // multiconn doesn't exist, make a new one. + conns := []conn.Conn{c} + mc, err := conn.NewMultiConn(s.Context(), s.local, c.RemotePeer(), conns) + if err != nil { + log.Error("error creating multiconn: %s", err) + c.Close() + return nil, err + } + + s.conns[c.RemotePeer().Key()] = mc s.connsLock.Unlock() - c.Close() - return c2, nil // not error anymore, use existing conn. - // return ErrAlreadyOpen + log.Debug("added new multiconn: %s", mc) + } else { + s.connsLock.Unlock() // unlock before adding new conn + + mc.Add(c) + log.Debug("multiconn found: %s", mc) } - s.conns[c.RemotePeer().Key()] = c - log.Debug("Added conn to map!") - s.connsLock.Unlock() + + log.Debug("multiconn added new conn %s", c) // kick off reader goroutine - go s.fanIn(c) + go s.fanInSingle(c) return c, nil } // Handles the unwrapping + sending of messages to the right connection. func (s *Swarm) fanOut() { + s.Children().Add(1) + defer s.Children().Done() + for { select { - case <-s.ctx.Done(): + case <-s.Closing(): return // told to close. case msg, ok := <-s.Outgoing: @@ -127,9 +151,9 @@ func (s *Swarm) fanOut() { s.connsLock.RUnlock() if !found { - e := fmt.Errorf("Sent msg to peer without open conn: %v", - msg.Peer) + e := fmt.Errorf("Sent msg to peer without open conn: %v", msg.Peer()) s.errChan <- e + log.Error("%s", e) continue } @@ -143,30 +167,37 @@ func (s *Swarm) fanOut() { // Handles the receiving + wrapping of messages, per conn. // Consider using reflect.Select with one goroutine instead of n. -func (s *Swarm) fanIn(c conn.Conn) { +func (s *Swarm) fanInSingle(c conn.Conn) { + s.Children().Add(1) + c.Children().Add(1) // child of Conn as well. + + // cleanup all data associated with this child Connection. + defer func() { + // remove it from the map. + s.connsLock.Lock() + delete(s.conns, c.RemotePeer().Key()) + s.connsLock.Unlock() + + s.Children().Done() + c.Children().Done() // child of Conn as well. + }() + for { select { - case <-s.ctx.Done(): - // close Conn. - c.Close() - goto out + case <-s.Closing(): // Swarm closing + return + + case <-c.Closing(): // Conn closing + return case data, ok := <-c.In(): if !ok { - e := fmt.Errorf("Error retrieving from conn: %v", c.RemotePeer()) - s.errChan <- e - goto out + return // channel closed. } - // log.Debug("[peer: %s] Received message [from = %s]", s.local, c.Peer) s.Incoming <- msg.New(c.RemotePeer(), data) } } - -out: - s.connsLock.Lock() - delete(s.conns, c.RemotePeer().Key()) - s.connsLock.Unlock() } // Commenting out because it's platform specific diff --git a/net/swarm/simul_test.go b/net/swarm/simul_test.go index 81d0d014606637389388299dd3d7854000bc442c..4110ce2730d07b9c9668ddcc2e8d2e634c9ce1ad 100644 --- a/net/swarm/simul_test.go +++ b/net/swarm/simul_test.go @@ -32,7 +32,6 @@ func TestSimultOpen(t *testing.T) { if _, err := s.Dial(cp); err != nil { t.Fatal("error swarm dialing to peer", err) } - log.Info("done?!?") wg.Done() } diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 6d6ab24fd1c01f53ed6465dc5fd697c26079148b..a81b872e0b3497c4d51895c649b672aecb948999 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -56,48 +56,42 @@ type Swarm struct { errChan chan error // conns are the open connections the swarm is handling. - conns conn.Map + // these are MultiConns, which multiplex multiple separate underlying Conns. + conns conn.MultiConnMap connsLock sync.RWMutex // listeners for each network address listeners []conn.Listener - // cancel is an internal function used to stop the Swarm's processing. - cancel context.CancelFunc - ctx context.Context + // ContextCloser + conn.ContextCloser } // NewSwarm constructs a Swarm, with a Chan. func NewSwarm(ctx context.Context, local *peer.Peer, ps peer.Peerstore) (*Swarm, error) { s := &Swarm{ Pipe: msg.NewPipe(10), - conns: conn.Map{}, + conns: conn.MultiConnMap{}, local: local, peers: ps, errChan: make(chan error, 100), } - s.ctx, s.cancel = context.WithCancel(ctx) + // ContextCloser for proper child management. + s.ContextCloser = conn.NewContextCloser(ctx, s.close) + go s.fanOut() return s, s.listen() } -// Close stops a swarm. -func (s *Swarm) Close() error { - if s.cancel == nil { - return errors.New("Swarm already closed.") - } - - // issue cancel for the context - s.cancel() - - // set cancel to nil to prevent calling Close again, and signal to Listeners - s.cancel = nil - +// close stops a swarm. It's the underlying function called by ContextCloser +func (s *Swarm) close() error { // close listeners for _, list := range s.listeners { list.Close() } + // close connections + conn.CloseConns(s.Connections()...) return nil } @@ -132,7 +126,7 @@ func (s *Swarm) Dial(peer *peer.Peer) (conn.Conn, error) { Peerstore: s.peers, } - c, err = d.Dial(s.ctx, "tcp", peer) + c, err = d.Dial(s.Context(), "tcp", peer) if err != nil { return nil, err } @@ -158,6 +152,19 @@ func (s *Swarm) GetConnection(pid peer.ID) conn.Conn { return c } +// Connections returns a slice of all connections. +func (s *Swarm) Connections() []conn.Conn { + s.connsLock.RLock() + + conns := make([]conn.Conn, 0, len(s.conns)) + for _, c := range s.conns { + conns = append(conns, c) + } + + s.connsLock.RUnlock() + return conns +} + // CloseConnection removes a given peer from swarm + closes the connection func (s *Swarm) CloseConnection(p *peer.Peer) error { c := s.GetConnection(p.ID) diff --git a/net/swarm/swarm_test.go b/net/swarm/swarm_test.go index 0ed948e81a590c41f2999bd3680b93a43a4225f1..e05e3680744cd946a0fd421346b53058525e6c4d 100644 --- a/net/swarm/swarm_test.go +++ b/net/swarm/swarm_test.go @@ -85,7 +85,11 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { var wg sync.WaitGroup connect := func(s *Swarm, dst *peer.Peer) { // copy for other peer - cp := &peer.Peer{ID: dst.ID} + + cp, err := s.peers.Get(dst.ID) + if err != nil { + cp = &peer.Peer{ID: dst.ID} + } cp.AddAddress(dst.Addresses[0]) log.Info("SWARM TEST: %s dialing %s", s.local, dst)