Commit c88a4b2c authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

peerstream update (gc conns)

parent 04b938b8
......@@ -136,7 +136,7 @@
},
{
"ImportPath": "github.com/jbenet/go-peerstream",
"Rev": "eab3056e47ecbd1bb32b8c8512fe46fc856f0387"
"Rev": "55792f89d00cf62166668ded3288536cbe6a72cc"
},
{
"ImportPath": "github.com/jbenet/go-random",
......
......@@ -111,8 +111,8 @@ func (c *Conn) Close() error {
}
// close underlying connection
c.netConn.Close()
return c.swarm.removeConn(c)
c.swarm.removeConn(c)
return c.pstConn.Close()
}
// ConnsWithGroup narrows down a set of connections to those in a given group.
......@@ -234,10 +234,9 @@ func (s *Swarm) removeStream(stream *Stream) error {
return stream.pstStream.Close()
}
func (s *Swarm) removeConn(conn *Conn) error {
func (s *Swarm) removeConn(conn *Conn) {
// remove from our maps
s.connLock.Lock()
delete(s.conns, conn)
s.connLock.Unlock()
return nil
}
......@@ -4,6 +4,7 @@ import (
"errors"
"net"
"sync"
"time"
pst "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
)
......@@ -11,6 +12,9 @@ import (
// fd is a (file) descriptor, unix style
type fd uint32
// GarbageCollectTimeout governs the periodic connection closer.
var GarbageCollectTimeout = 5 * time.Second
type Swarm struct {
// the transport we'll use.
transport pst.Transport
......@@ -33,10 +37,12 @@ type Swarm struct {
connHandler ConnHandler // receives Conns intiated remotely
streamHandler StreamHandler // receives Streams initiated remotely
selectConn SelectConn // default SelectConn function
closed chan struct{}
}
func NewSwarm(t pst.Transport) *Swarm {
return &Swarm{
s := &Swarm{
transport: t,
streams: make(map[*Stream]struct{}),
conns: make(map[*Conn]struct{}),
......@@ -44,7 +50,10 @@ func NewSwarm(t pst.Transport) *Swarm {
selectConn: SelectRandomConn,
streamHandler: NoOpStreamHandler,
connHandler: NoOpConnHandler,
closed: make(chan struct{}),
}
go s.connGarbageCollect()
return s
}
// SetStreamHandler assigns the stream handler in the swarm.
......@@ -122,7 +131,16 @@ func (s *Swarm) Conns() []*Conn {
conns = append(conns, c)
}
s.connLock.RUnlock()
return conns
open := make([]*Conn, 0, len(conns))
for _, c := range conns {
if c.pstConn.IsClosed() {
c.Close()
} else {
open = append(open, c)
}
}
return open
}
// Listeners returns all the listeners associated with this Swarm.
......@@ -225,6 +243,11 @@ func (s *Swarm) NewStreamWithConn(conn *Conn) (*Stream, error) {
return nil, errors.New("connection not associated with swarm")
}
if conn.pstConn.IsClosed() {
go conn.Close()
return nil, errors.New("conn is closed")
}
s.connLock.RLock()
if _, found := s.conns[conn]; !found {
s.connLock.RUnlock()
......@@ -251,6 +274,46 @@ func (s *Swarm) StreamsWithGroup(g Group) []*Stream {
// Close shuts down the Swarm, and it's listeners.
func (s *Swarm) Close() error {
// shut down TODO
// automatically close everything new we get.
s.SetConnHandler(func(c *Conn) { c.Close() })
s.SetStreamHandler(func(s *Stream) { s.Close() })
var wgl sync.WaitGroup
for _, l := range s.Listeners() {
wgl.Add(1)
go func() {
l.Close()
wgl.Done()
}()
}
wgl.Wait()
var wgc sync.WaitGroup
for _, c := range s.Conns() {
wgc.Add(1)
go func() {
c.Close()
wgc.Done()
}()
}
wgc.Wait()
return nil
}
// connGarbageCollect periodically sweeps conns to make sure
// they're still alive. if any are closed, remvoes them.
func (s *Swarm) connGarbageCollect() {
for {
select {
case <-s.closed:
return
case <-time.After(GarbageCollectTimeout):
}
for _, c := range s.Conns() {
if c.pstConn.IsClosed() {
go c.Close()
}
}
}
}
......@@ -31,6 +31,8 @@ func (s *stream) Close() error {
// Conn is a connection to a remote peer.
type conn struct {
ms muxado.Session
closed chan struct{}
}
func (c *conn) muxadoSession() muxado.Session {
......@@ -41,6 +43,15 @@ func (c *conn) Close() error {
return c.ms.Close()
}
func (c *conn) IsClosed() bool {
select {
case <-c.closed:
return true
default:
return false
}
}
// OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) {
s, err := c.ms.Open()
......@@ -76,5 +87,10 @@ func (t transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
} else {
s = muxado.Client(nc)
}
return &conn{ms: s}, nil
cl := make(chan struct{})
go func() {
s.Wait()
close(cl)
}()
return &conn{ms: s, closed: cl}, nil
}
......@@ -20,6 +20,10 @@ type StreamHandler func(Stream)
type Conn interface {
io.Closer
// IsClosed returns whether a connection is fully closed, so it can
// be garbage collected.
IsClosed() bool
// OpenStream creates a new stream.
OpenStream() (Stream, error)
......
......@@ -39,6 +39,10 @@ func (c *conn) Close() error {
return c.yamuxSession().Close()
}
func (c *conn) IsClosed() bool {
return c.yamuxSession().IsClosed()
}
// OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) {
s, err := c.yamuxSession().OpenStream()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment