swarm.go 5.04 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2
// package swarm implements a connection muxer with a pair of channels
// to synchronize all network communication.
3 4 5 6 7 8 9 10 11 12 13
package swarm

import (
	"errors"
	"fmt"
	"sync"

	conn "github.com/jbenet/go-ipfs/net/conn"
	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15
	"github.com/jbenet/go-ipfs/util/eventlog"
16 17

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
18
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
19 20
)

21
var log = eventlog.Logger("swarm")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22

23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
// ErrAlreadyOpen signals that a connection to a peer is already open.
var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.")

// ListenErr contains a set of errors mapping to each of the swarms addresses.
// Used to return multiple errors, as in listen.
type ListenErr struct {
	Errors []error
}

func (e *ListenErr) Error() string {
	if e == nil {
		return "<nil error>"
	}
	var out string
	for i, v := range e.Errors {
		if v != nil {
			out += fmt.Sprintf("%d: %s\n", i, v)
		}
	}
	return out
}

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
type Swarm struct {

	// local is the peer this swarm represents
52
	local peer.Peer
53

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54 55 56
	// peers is a collection of peers for swarm to use
	peers peer.Peerstore

57 58 59 60 61 62 63
	// Swarm includes a Pipe object.
	*msg.Pipe

	// errChan is the channel of errors.
	errChan chan error

	// conns are the open connections the swarm is handling.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65
	// these are MultiConns, which multiplex multiple separate underlying Conns.
	conns     conn.MultiConnMap
66 67 68
	connsLock sync.RWMutex

	// listeners for each network address
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
	listeners []conn.Listener
70

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
	// ContextCloser
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
	ctxc.ContextCloser
73 74 75
}

// NewSwarm constructs a Swarm, with a Chan.
76
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer, ps peer.Peerstore) (*Swarm, error) {
77 78
	s := &Swarm{
		Pipe:    msg.NewPipe(10),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
		conns:   conn.MultiConnMap{},
80
		local:   local,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
		peers:   ps,
82 83 84
		errChan: make(chan error, 100),
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
85
	// ContextCloser for proper child management.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
	s.ContextCloser = ctxc.NewContextCloser(ctx, s.close)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
	s.Children().Add(1)
89
	go s.fanOut()
90
	return s, s.listen(listenAddrs)
91 92
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93 94
// close stops a swarm. It's the underlying function called by ContextCloser
func (s *Swarm) close() error {
95 96 97 98
	// close listeners
	for _, list := range s.listeners {
		list.Close()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99 100
	// close connections
	conn.CloseConns(s.Connections()...)
101 102 103 104 105 106 107 108 109 110 111
	return nil
}

// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
112 113
func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
	if peer.ID().Equal(s.local.ID()) {
114 115 116 117
		return nil, errors.New("Attempted connection to self!")
	}

	// check if we already have an open connection first
118
	c := s.GetConnection(peer.ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119 120 121 122 123
	if c != nil {
		return c, nil
	}

	// check if we don't have the peer in Peerstore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
124
	peer, err := s.peers.Add(peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125 126 127
	if err != nil {
		return nil, err
	}
128 129

	// open connection to peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130 131 132 133 134
	d := &conn.Dialer{
		LocalPeer: s.local,
		Peerstore: s.peers,
	}

135 136 137 138 139 140 141 142 143
	// try to connect to one of the peer's known addresses.
	// for simplicity, we do this sequentially.
	// A future commit will do this asynchronously.
	for _, addr := range peer.Addresses() {
		c, err = d.DialAddr(s.Context(), addr, peer)
		if err == nil {
			break
		}
	}
144 145 146 147
	if err != nil {
		return nil, err
	}

148 149
	c, err = s.connSetup(c)
	if err != nil {
150 151 152 153 154 155 156
		c.Close()
		return nil, err
	}

	return c, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
// GetConnection returns the connection in the swarm to given peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
func (s *Swarm) GetConnection(pid peer.ID) conn.Conn {
159
	s.connsLock.RLock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
	c, found := s.conns[u.Key(pid)]
161 162 163 164 165
	s.connsLock.RUnlock()

	if !found {
		return nil
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166
	return c
167 168
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169 170 171 172 173 174 175 176 177 178 179 180 181
// Connections returns a slice of all connections.
func (s *Swarm) Connections() []conn.Conn {
	s.connsLock.RLock()

	conns := make([]conn.Conn, 0, len(s.conns))
	for _, c := range s.conns {
		conns = append(conns, c)
	}

	s.connsLock.RUnlock()
	return conns
}

182
// CloseConnection removes a given peer from swarm + closes the connection
183 184
func (s *Swarm) CloseConnection(p peer.Peer) error {
	c := s.GetConnection(p.ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185
	if c == nil {
186 187 188 189
		return u.ErrNotFound
	}

	s.connsLock.Lock()
190
	delete(s.conns, u.Key(p.ID()))
191 192
	s.connsLock.Unlock()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193
	return c.Close()
194 195 196 197 198 199 200 201 202 203 204
}

func (s *Swarm) Error(e error) {
	s.errChan <- e
}

// GetErrChan returns the errors chan.
func (s *Swarm) GetErrChan() chan error {
	return s.errChan
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205
// GetPeerList returns a copy of the set of peers swarm is connected to.
206 207
func (s *Swarm) GetPeerList() []peer.Peer {
	var out []peer.Peer
208
	s.connsLock.RLock()
Jeromy's avatar
Jeromy committed
209
	for _, p := range s.conns {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210
		out = append(out, p.RemotePeer())
Jeromy's avatar
Jeromy committed
211
	}
212
	s.connsLock.RUnlock()
Jeromy's avatar
Jeromy committed
213 214
	return out
}