package swarm import ( "errors" "fmt" "sync" conn "github.com/jbenet/go-ipfs/net/conn" msg "github.com/jbenet/go-ipfs/net/message" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) var log = u.Logger("swarm") // ErrAlreadyOpen signals that a connection to a peer is already open. var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.") // ListenErr contains a set of errors mapping to each of the swarms addresses. // Used to return multiple errors, as in listen. type ListenErr struct { Errors []error } func (e *ListenErr) Error() string { if e == nil { return "" } var out string for i, v := range e.Errors { if v != nil { out += fmt.Sprintf("%d: %s\n", i, v) } } return out } // Swarm is a connection muxer, allowing connections to other peers to // be opened and closed, while still using the same Chan for all // communication. The Chan sends/receives Messages, which note the // destination or source Peer. type Swarm struct { // local is the peer this swarm represents local *peer.Peer // peers is a collection of peers for swarm to use peers peer.Peerstore // Swarm includes a Pipe object. *msg.Pipe // errChan is the channel of errors. errChan chan error // conns are the open connections the swarm is handling. conns conn.Map connsLock sync.RWMutex // listeners for each network address listeners []conn.Listener // cancel is an internal function used to stop the Swarm's processing. cancel context.CancelFunc ctx context.Context } // NewSwarm constructs a Swarm, with a Chan. func NewSwarm(ctx context.Context, local *peer.Peer, ps peer.Peerstore) (*Swarm, error) { s := &Swarm{ Pipe: msg.NewPipe(10), conns: conn.Map{}, local: local, peers: ps, errChan: make(chan error, 100), } s.ctx, s.cancel = context.WithCancel(ctx) go s.fanOut() return s, s.listen() } // Close stops a swarm. func (s *Swarm) Close() error { if s.cancel == nil { return errors.New("Swarm already closed.") } // issue cancel for the context s.cancel() // set cancel to nil to prevent calling Close again, and signal to Listeners s.cancel = nil // close listeners for _, list := range s.listeners { list.Close() } return nil } // Dial connects to a peer. // // The idea is that the client of Swarm does not need to know what network // the connection will happen over. Swarm can use whichever it choses. // This allows us to use various transport protocols, do NAT traversal/relay, // etc. to achive connection. // // For now, Dial uses only TCP. This will be extended. func (s *Swarm) Dial(peer *peer.Peer) (conn.Conn, error) { if peer.ID.Equal(s.local.ID) { return nil, errors.New("Attempted connection to self!") } // check if we already have an open connection first c := s.GetConnection(peer.ID) if c != nil { return c, nil } // check if we don't have the peer in Peerstore err := s.peers.Put(peer) if err != nil { return nil, err } // open connection to peer d := &conn.Dialer{ LocalPeer: s.local, Peerstore: s.peers, } c, err = d.Dial(s.ctx, "tcp", peer) if err != nil { return nil, err } c, err = s.connSetup(c) if err != nil { c.Close() return nil, err } return c, nil } // GetConnection returns the connection in the swarm to given peer.ID func (s *Swarm) GetConnection(pid peer.ID) conn.Conn { s.connsLock.RLock() c, found := s.conns[u.Key(pid)] s.connsLock.RUnlock() if !found { return nil } return c } // CloseConnection removes a given peer from swarm + closes the connection func (s *Swarm) CloseConnection(p *peer.Peer) error { c := s.GetConnection(p.ID) if c == nil { return u.ErrNotFound } s.connsLock.Lock() delete(s.conns, u.Key(p.ID)) s.connsLock.Unlock() return c.Close() } func (s *Swarm) Error(e error) { s.errChan <- e } // GetErrChan returns the errors chan. func (s *Swarm) GetErrChan() chan error { return s.errChan } // GetPeerList returns a copy of the set of peers swarm is connected to. func (s *Swarm) GetPeerList() []*peer.Peer { var out []*peer.Peer s.connsLock.RLock() for _, p := range s.conns { out = append(out, p.RemotePeer()) } s.connsLock.RUnlock() return out } // Temporary to ensure that the Swarm always matches the Network interface as we are changing it // var _ Network = &Swarm{}