swarm.go 5.35 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2
// package swarm implements a connection muxer with a pair of channels
// to synchronize all network communication.
3 4 5 6 7 8 9 10 11 12 13
package swarm

import (
	"errors"
	"fmt"
	"sync"

	conn "github.com/jbenet/go-ipfs/net/conn"
	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15
	"github.com/jbenet/go-ipfs/util/eventlog"
16 17

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
18
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
19 20
)

21
var log = eventlog.Logger("swarm")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22

23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
// ErrAlreadyOpen signals that a connection to a peer is already open.
var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.")

// ListenErr contains a set of errors mapping to each of the swarms addresses.
// Used to return multiple errors, as in listen.
type ListenErr struct {
	Errors []error
}

func (e *ListenErr) Error() string {
	if e == nil {
		return "<nil error>"
	}
	var out string
	for i, v := range e.Errors {
		if v != nil {
			out += fmt.Sprintf("%d: %s\n", i, v)
		}
	}
	return out
}

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
type Swarm struct {

	// local is the peer this swarm represents
52
	local peer.Peer
53

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54 55 56
	// peers is a collection of peers for swarm to use
	peers peer.Peerstore

57 58 59 60 61 62 63
	// Swarm includes a Pipe object.
	*msg.Pipe

	// errChan is the channel of errors.
	errChan chan error

	// conns are the open connections the swarm is handling.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65
	// these are MultiConns, which multiplex multiple separate underlying Conns.
	conns     conn.MultiConnMap
66 67 68
	connsLock sync.RWMutex

	// listeners for each network address
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
	listeners []conn.Listener
70

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
	// ContextCloser
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
	ctxc.ContextCloser
73 74 75
}

// NewSwarm constructs a Swarm, with a Chan.
76
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer, ps peer.Peerstore) (*Swarm, error) {
77 78
	s := &Swarm{
		Pipe:    msg.NewPipe(10),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
		conns:   conn.MultiConnMap{},
80
		local:   local,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
		peers:   ps,
82 83 84
		errChan: make(chan error, 100),
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
85
	// ContextCloser for proper child management.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
	s.ContextCloser = ctxc.NewContextCloser(ctx, s.close)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
	s.Children().Add(1)
89
	go s.fanOut()
90
	return s, s.listen(listenAddrs)
91 92
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93 94
// close stops a swarm. It's the underlying function called by ContextCloser
func (s *Swarm) close() error {
95 96 97 98
	// close listeners
	for _, list := range s.listeners {
		list.Close()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99 100
	// close connections
	conn.CloseConns(s.Connections()...)
101 102 103 104 105 106 107 108 109 110 111
	return nil
}

// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
112 113
func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
	if peer.ID().Equal(s.local.ID()) {
114 115 116 117
		return nil, errors.New("Attempted connection to self!")
	}

	// check if we already have an open connection first
118
	c := s.GetConnection(peer.ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119 120 121 122 123
	if c != nil {
		return c, nil
	}

	// check if we don't have the peer in Peerstore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
124
	peer, err := s.peers.Add(peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125 126 127
	if err != nil {
		return nil, err
	}
128 129

	// open connection to peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130 131 132 133 134
	d := &conn.Dialer{
		LocalPeer: s.local,
		Peerstore: s.peers,
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
135 136 137
	if len(peer.Addresses()) == 0 {
		return nil, errors.New("peer has no addresses")
	}
138 139 140 141 142 143 144 145 146
	// try to connect to one of the peer's known addresses.
	// for simplicity, we do this sequentially.
	// A future commit will do this asynchronously.
	for _, addr := range peer.Addresses() {
		c, err = d.DialAddr(s.Context(), addr, peer)
		if err == nil {
			break
		}
	}
147 148 149 150
	if err != nil {
		return nil, err
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
151
	c2, err := s.connSetup(c)
152
	if err != nil {
153 154 155 156
		c.Close()
		return nil, err
	}

157 158
	// TODO replace the TODO ctx with a context passed in from caller
	log.Event(context.TODO(), "dial", peer)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
159
	return c2, nil
160 161
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162
// GetConnection returns the connection in the swarm to given peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
func (s *Swarm) GetConnection(pid peer.ID) conn.Conn {
164
	s.connsLock.RLock()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
165
	defer s.connsLock.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166
	c, found := s.conns[u.Key(pid)]
167 168 169 170

	if !found {
		return nil
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171
	return c
172 173
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174 175 176 177 178 179 180 181 182 183 184 185 186
// Connections returns a slice of all connections.
func (s *Swarm) Connections() []conn.Conn {
	s.connsLock.RLock()

	conns := make([]conn.Conn, 0, len(s.conns))
	for _, c := range s.conns {
		conns = append(conns, c)
	}

	s.connsLock.RUnlock()
	return conns
}

187
// CloseConnection removes a given peer from swarm + closes the connection
188 189
func (s *Swarm) CloseConnection(p peer.Peer) error {
	c := s.GetConnection(p.ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190
	if c == nil {
191 192 193 194
		return u.ErrNotFound
	}

	s.connsLock.Lock()
195
	delete(s.conns, u.Key(p.ID()))
196 197
	s.connsLock.Unlock()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198
	return c.Close()
199 200 201 202 203 204 205 206 207 208 209
}

func (s *Swarm) Error(e error) {
	s.errChan <- e
}

// GetErrChan returns the errors chan.
func (s *Swarm) GetErrChan() chan error {
	return s.errChan
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210
// GetPeerList returns a copy of the set of peers swarm is connected to.
211 212
func (s *Swarm) GetPeerList() []peer.Peer {
	var out []peer.Peer
213
	s.connsLock.RLock()
Jeromy's avatar
Jeromy committed
214
	for _, p := range s.conns {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215
		out = append(out, p.RemotePeer())
Jeromy's avatar
Jeromy committed
216
	}
217
	s.connsLock.RUnlock()
Jeromy's avatar
Jeromy committed
218 219
	return out
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220 221 222 223 224

// LocalPeer returns the local peer swarm is associated to.
func (s *Swarm) LocalPeer() peer.Peer {
	return s.local
}