Commit b427df39 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #518 from jbenet/gc-conns

gc closed conns
parents 04b938b8 84a2a591
...@@ -136,7 +136,7 @@ ...@@ -136,7 +136,7 @@
}, },
{ {
"ImportPath": "github.com/jbenet/go-peerstream", "ImportPath": "github.com/jbenet/go-peerstream",
"Rev": "eab3056e47ecbd1bb32b8c8512fe46fc856f0387" "Rev": "55792f89d00cf62166668ded3288536cbe6a72cc"
}, },
{ {
"ImportPath": "github.com/jbenet/go-random", "ImportPath": "github.com/jbenet/go-random",
......
...@@ -111,8 +111,8 @@ func (c *Conn) Close() error { ...@@ -111,8 +111,8 @@ func (c *Conn) Close() error {
} }
// close underlying connection // close underlying connection
c.netConn.Close() c.swarm.removeConn(c)
return c.swarm.removeConn(c) return c.pstConn.Close()
} }
// ConnsWithGroup narrows down a set of connections to those in a given group. // ConnsWithGroup narrows down a set of connections to those in a given group.
...@@ -234,10 +234,9 @@ func (s *Swarm) removeStream(stream *Stream) error { ...@@ -234,10 +234,9 @@ func (s *Swarm) removeStream(stream *Stream) error {
return stream.pstStream.Close() return stream.pstStream.Close()
} }
func (s *Swarm) removeConn(conn *Conn) error { func (s *Swarm) removeConn(conn *Conn) {
// remove from our maps // remove from our maps
s.connLock.Lock() s.connLock.Lock()
delete(s.conns, conn) delete(s.conns, conn)
s.connLock.Unlock() s.connLock.Unlock()
return nil
} }
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"errors" "errors"
"net" "net"
"sync" "sync"
"time"
pst "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" pst "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
) )
...@@ -11,6 +12,9 @@ import ( ...@@ -11,6 +12,9 @@ import (
// fd is a (file) descriptor, unix style // fd is a (file) descriptor, unix style
type fd uint32 type fd uint32
// GarbageCollectTimeout governs the periodic connection closer.
var GarbageCollectTimeout = 5 * time.Second
type Swarm struct { type Swarm struct {
// the transport we'll use. // the transport we'll use.
transport pst.Transport transport pst.Transport
...@@ -33,10 +37,12 @@ type Swarm struct { ...@@ -33,10 +37,12 @@ type Swarm struct {
connHandler ConnHandler // receives Conns intiated remotely connHandler ConnHandler // receives Conns intiated remotely
streamHandler StreamHandler // receives Streams initiated remotely streamHandler StreamHandler // receives Streams initiated remotely
selectConn SelectConn // default SelectConn function selectConn SelectConn // default SelectConn function
closed chan struct{}
} }
func NewSwarm(t pst.Transport) *Swarm { func NewSwarm(t pst.Transport) *Swarm {
return &Swarm{ s := &Swarm{
transport: t, transport: t,
streams: make(map[*Stream]struct{}), streams: make(map[*Stream]struct{}),
conns: make(map[*Conn]struct{}), conns: make(map[*Conn]struct{}),
...@@ -44,7 +50,10 @@ func NewSwarm(t pst.Transport) *Swarm { ...@@ -44,7 +50,10 @@ func NewSwarm(t pst.Transport) *Swarm {
selectConn: SelectRandomConn, selectConn: SelectRandomConn,
streamHandler: NoOpStreamHandler, streamHandler: NoOpStreamHandler,
connHandler: NoOpConnHandler, connHandler: NoOpConnHandler,
closed: make(chan struct{}),
} }
go s.connGarbageCollect()
return s
} }
// SetStreamHandler assigns the stream handler in the swarm. // SetStreamHandler assigns the stream handler in the swarm.
...@@ -122,7 +131,16 @@ func (s *Swarm) Conns() []*Conn { ...@@ -122,7 +131,16 @@ func (s *Swarm) Conns() []*Conn {
conns = append(conns, c) conns = append(conns, c)
} }
s.connLock.RUnlock() s.connLock.RUnlock()
return conns
open := make([]*Conn, 0, len(conns))
for _, c := range conns {
if c.pstConn.IsClosed() {
c.Close()
} else {
open = append(open, c)
}
}
return open
} }
// Listeners returns all the listeners associated with this Swarm. // Listeners returns all the listeners associated with this Swarm.
...@@ -225,6 +243,11 @@ func (s *Swarm) NewStreamWithConn(conn *Conn) (*Stream, error) { ...@@ -225,6 +243,11 @@ func (s *Swarm) NewStreamWithConn(conn *Conn) (*Stream, error) {
return nil, errors.New("connection not associated with swarm") return nil, errors.New("connection not associated with swarm")
} }
if conn.pstConn.IsClosed() {
go conn.Close()
return nil, errors.New("conn is closed")
}
s.connLock.RLock() s.connLock.RLock()
if _, found := s.conns[conn]; !found { if _, found := s.conns[conn]; !found {
s.connLock.RUnlock() s.connLock.RUnlock()
...@@ -251,6 +274,46 @@ func (s *Swarm) StreamsWithGroup(g Group) []*Stream { ...@@ -251,6 +274,46 @@ func (s *Swarm) StreamsWithGroup(g Group) []*Stream {
// Close shuts down the Swarm, and it's listeners. // Close shuts down the Swarm, and it's listeners.
func (s *Swarm) Close() error { func (s *Swarm) Close() error {
// shut down TODO // automatically close everything new we get.
s.SetConnHandler(func(c *Conn) { c.Close() })
s.SetStreamHandler(func(s *Stream) { s.Close() })
var wgl sync.WaitGroup
for _, l := range s.Listeners() {
wgl.Add(1)
go func() {
l.Close()
wgl.Done()
}()
}
wgl.Wait()
var wgc sync.WaitGroup
for _, c := range s.Conns() {
wgc.Add(1)
go func() {
c.Close()
wgc.Done()
}()
}
wgc.Wait()
return nil return nil
} }
// connGarbageCollect periodically sweeps conns to make sure
// they're still alive. if any are closed, remvoes them.
func (s *Swarm) connGarbageCollect() {
for {
select {
case <-s.closed:
return
case <-time.After(GarbageCollectTimeout):
}
for _, c := range s.Conns() {
if c.pstConn.IsClosed() {
go c.Close()
}
}
}
}
...@@ -31,6 +31,8 @@ func (s *stream) Close() error { ...@@ -31,6 +31,8 @@ func (s *stream) Close() error {
// Conn is a connection to a remote peer. // Conn is a connection to a remote peer.
type conn struct { type conn struct {
ms muxado.Session ms muxado.Session
closed chan struct{}
} }
func (c *conn) muxadoSession() muxado.Session { func (c *conn) muxadoSession() muxado.Session {
...@@ -41,6 +43,15 @@ func (c *conn) Close() error { ...@@ -41,6 +43,15 @@ func (c *conn) Close() error {
return c.ms.Close() return c.ms.Close()
} }
func (c *conn) IsClosed() bool {
select {
case <-c.closed:
return true
default:
return false
}
}
// OpenStream creates a new stream. // OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) { func (c *conn) OpenStream() (pst.Stream, error) {
s, err := c.ms.Open() s, err := c.ms.Open()
...@@ -76,5 +87,10 @@ func (t transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { ...@@ -76,5 +87,10 @@ func (t transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
} else { } else {
s = muxado.Client(nc) s = muxado.Client(nc)
} }
return &conn{ms: s}, nil cl := make(chan struct{})
go func() {
s.Wait()
close(cl)
}()
return &conn{ms: s, closed: cl}, nil
} }
...@@ -20,6 +20,10 @@ type StreamHandler func(Stream) ...@@ -20,6 +20,10 @@ type StreamHandler func(Stream)
type Conn interface { type Conn interface {
io.Closer io.Closer
// IsClosed returns whether a connection is fully closed, so it can
// be garbage collected.
IsClosed() bool
// OpenStream creates a new stream. // OpenStream creates a new stream.
OpenStream() (Stream, error) OpenStream() (Stream, error)
......
...@@ -39,6 +39,10 @@ func (c *conn) Close() error { ...@@ -39,6 +39,10 @@ func (c *conn) Close() error {
return c.yamuxSession().Close() return c.yamuxSession().Close()
} }
func (c *conn) IsClosed() bool {
return c.yamuxSession().IsClosed()
}
// OpenStream creates a new stream. // OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) { func (c *conn) OpenStream() (pst.Stream, error) {
s, err := c.yamuxSession().OpenStream() s, err := c.yamuxSession().OpenStream()
......
...@@ -58,11 +58,7 @@ func newSecureConn(ctx context.Context, sk ic.PrivKey, insecure Conn) (Conn, err ...@@ -58,11 +58,7 @@ func newSecureConn(ctx context.Context, sk ic.PrivKey, insecure Conn) (Conn, err
} }
func (c *secureConn) Close() error { func (c *secureConn) Close() error {
if err := c.secure.Close(); err != nil { return c.secure.Close()
c.insecure.Close()
return err
}
return c.insecure.Close()
} }
// ID is an identifier unique to this connection. // ID is an identifier unique to this connection.
......
// Package reconnect tests connect -> disconnect -> reconnect works
package reconnect
package reconnect
import (
crand "crypto/rand"
"io"
"math/rand"
"sync"
"testing"
"time"
host "github.com/jbenet/go-ipfs/p2p/host"
inet "github.com/jbenet/go-ipfs/p2p/net"
swarm "github.com/jbenet/go-ipfs/p2p/net/swarm"
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
testutil "github.com/jbenet/go-ipfs/p2p/test/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
)
func init() {
// change the garbage collect timeout for testing.
ps.GarbageCollectTimeout = 10 * time.Millisecond
}
var log = eventlog.Logger("reconnect")
func EchoStreamHandler(stream inet.Stream) {
c := stream.Conn()
log.Debugf("%s echoing %s", c.LocalPeer(), c.RemotePeer())
go func() {
defer stream.Close()
io.Copy(stream, stream)
}()
}
type sendChans struct {
send chan struct{}
sent chan struct{}
read chan struct{}
close_ chan struct{}
closed chan struct{}
}
func newSendChans() sendChans {
return sendChans{
send: make(chan struct{}),
sent: make(chan struct{}),
read: make(chan struct{}),
close_: make(chan struct{}),
closed: make(chan struct{}),
}
}
func newSender() (chan sendChans, func(s inet.Stream)) {
scc := make(chan sendChans)
return scc, func(s inet.Stream) {
sc := newSendChans()
scc <- sc
defer func() {
s.Close()
sc.closed <- struct{}{}
}()
buf := make([]byte, 65536)
buf2 := make([]byte, 65536)
crand.Read(buf)
for {
select {
case <-sc.close_:
return
case <-sc.send:
}
// send a randomly sized subchunk
from := rand.Intn(len(buf) / 2)
to := rand.Intn(len(buf) / 2)
sendbuf := buf[from : from+to]
log.Debugf("sender sending %d bytes", len(sendbuf))
n, err := s.Write(sendbuf)
if err != nil {
log.Debug("sender error. exiting:", err)
return
}
log.Debugf("sender wrote %d bytes", n)
sc.sent <- struct{}{}
if n, err = io.ReadFull(s, buf2[:len(sendbuf)]); err != nil {
log.Debug("sender error. failed to read:", err)
return
}
log.Debugf("sender read %d bytes", n)
sc.read <- struct{}{}
}
}
}
// TestReconnect tests whether hosts are able to disconnect and reconnect.
func TestReconnect2(t *testing.T) {
ctx := context.Background()
h1 := testutil.GenHostSwarm(t, ctx)
h2 := testutil.GenHostSwarm(t, ctx)
hosts := []host.Host{h1, h2}
h1.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
h2.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
rounds := 10
if testing.Short() {
rounds = 4
}
for i := 0; i < rounds; i++ {
log.Debugf("TestReconnect: %d/%d\n", i, rounds)
SubtestConnSendDisc(t, hosts)
}
}
// TestReconnect tests whether hosts are able to disconnect and reconnect.
func TestReconnect5(t *testing.T) {
ctx := context.Background()
h1 := testutil.GenHostSwarm(t, ctx)
h2 := testutil.GenHostSwarm(t, ctx)
h3 := testutil.GenHostSwarm(t, ctx)
h4 := testutil.GenHostSwarm(t, ctx)
h5 := testutil.GenHostSwarm(t, ctx)
hosts := []host.Host{h1, h2, h3, h4, h5}
h1.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
h2.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
h3.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
h4.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
h5.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
rounds := 10
if testing.Short() {
rounds = 4
}
for i := 0; i < rounds; i++ {
log.Debugf("TestReconnect: %d/%d\n", i, rounds)
SubtestConnSendDisc(t, hosts)
}
}
func SubtestConnSendDisc(t *testing.T, hosts []host.Host) {
ctx := context.Background()
numStreams := 3 * len(hosts)
numMsgs := 10
if testing.Short() {
numStreams = 5 * len(hosts)
numMsgs = 4
}
ss, sF := newSender()
for _, h1 := range hosts {
for _, h2 := range hosts {
if h1.ID() >= h2.ID() {
continue
}
h2pi := h2.Peerstore().PeerInfo(h2.ID())
log.Debugf("dialing %s", h2pi.Addrs)
if err := h1.Connect(ctx, h2pi); err != nil {
t.Fatalf("Failed to connect:", err)
}
}
}
var wg sync.WaitGroup
for i := 0; i < numStreams; i++ {
h1 := hosts[i%len(hosts)]
h2 := hosts[(i+1)%len(hosts)]
s, err := h1.NewStream(protocol.TestingID, h2.ID())
if err != nil {
t.Error(err)
}
wg.Add(1)
go func(j int) {
defer wg.Done()
go sF(s)
log.Debugf("getting handle %d", i)
sc := <-ss // wait to get handle.
log.Debugf("spawning worker %d", i)
for i := 0; i < numMsgs; i++ {
sc.send <- struct{}{}
<-sc.sent
log.Debugf("%d sent %d", j, i)
<-sc.read
log.Debugf("%d read %d", j, i)
}
sc.close_ <- struct{}{}
<-sc.closed
log.Debugf("closed %d", j)
}(i)
}
wg.Wait()
for i, h1 := range hosts {
log.Debugf("host %d has %d conns", i, len(h1.Network().Conns()))
}
for _, h1 := range hosts {
// close connection
cs := h1.Network().Conns()
for _, c := range cs {
sc := c.(*swarm.Conn)
if sc.LocalPeer() > sc.RemotePeer() {
continue // only close it on one side.
}
log.Debugf("closing: %s", sc.RawConn())
sc.Close()
}
}
<-time.After(20 * time.Millisecond)
for i, h := range hosts {
if len(h.Network().Conns()) > 0 {
t.Fatalf("host %d %s has %d conns! not zero.", i, h.ID(), len(h.Network().Conns()))
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment