swarm.go 5.1 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2
// package swarm implements a connection muxer with a pair of channels
// to synchronize all network communication.
3 4 5 6 7
package swarm

import (
	"errors"
	"fmt"
Jeromy's avatar
Jeromy committed
8
	"strings"
9 10 11 12 13 14
	"sync"

	conn "github.com/jbenet/go-ipfs/net/conn"
	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
16 17

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
18
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
19 20
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22
var log = u.Logger("swarm")

23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
// ErrAlreadyOpen signals that a connection to a peer is already open.
var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.")

// ListenErr contains a set of errors mapping to each of the swarms addresses.
// Used to return multiple errors, as in listen.
type ListenErr struct {
	Errors []error
}

func (e *ListenErr) Error() string {
	if e == nil {
		return "<nil error>"
	}
	var out string
	for i, v := range e.Errors {
		if v != nil {
			out += fmt.Sprintf("%d: %s\n", i, v)
		}
	}
	return out
}

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
type Swarm struct {

	// local is the peer this swarm represents
52
	local peer.Peer
53

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54 55 56
	// peers is a collection of peers for swarm to use
	peers peer.Peerstore

57 58 59 60 61 62 63
	// Swarm includes a Pipe object.
	*msg.Pipe

	// errChan is the channel of errors.
	errChan chan error

	// conns are the open connections the swarm is handling.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65
	// these are MultiConns, which multiplex multiple separate underlying Conns.
	conns     conn.MultiConnMap
66 67 68
	connsLock sync.RWMutex

	// listeners for each network address
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
	listeners []conn.Listener
70

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
	// ContextCloser
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
	ctxc.ContextCloser
73 74 75
}

// NewSwarm constructs a Swarm, with a Chan.
76
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer, ps peer.Peerstore) (*Swarm, error) {
77 78
	s := &Swarm{
		Pipe:    msg.NewPipe(10),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
		conns:   conn.MultiConnMap{},
80
		local:   local,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
		peers:   ps,
82 83 84
		errChan: make(chan error, 100),
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
85
	// ContextCloser for proper child management.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
	s.ContextCloser = ctxc.NewContextCloser(ctx, s.close)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
	s.Children().Add(1)
89
	go s.fanOut()
90
	return s, s.listen(listenAddrs)
91 92
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93 94
// close stops a swarm. It's the underlying function called by ContextCloser
func (s *Swarm) close() error {
95 96 97 98
	// close listeners
	for _, list := range s.listeners {
		list.Close()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99 100
	// close connections
	conn.CloseConns(s.Connections()...)
101 102 103 104 105 106 107 108 109 110 111
	return nil
}

// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
112 113
func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
	if peer.ID().Equal(s.local.ID()) {
114 115 116 117
		return nil, errors.New("Attempted connection to self!")
	}

	// check if we already have an open connection first
118
	c := s.GetConnection(peer.ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119 120 121 122 123
	if c != nil {
		return c, nil
	}

	// check if we don't have the peer in Peerstore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
124
	peer, err := s.peers.Add(peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125 126 127
	if err != nil {
		return nil, err
	}
128 129

	// open connection to peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130 131 132 133 134
	d := &conn.Dialer{
		LocalPeer: s.local,
		Peerstore: s.peers,
	}

Jeromy's avatar
Jeromy committed
135
	// If we are attempting to connect to the zero addr, fail out early
Jeromy's avatar
Jeromy committed
136 137 138 139 140 141 142 143 144
	raddr := peer.NetAddress("tcp")
	if raddr == nil {
		return nil, fmt.Errorf("No remote address for network tcp")
	}

	if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
		return nil, fmt.Errorf("Attempted to connect to loopback address: %s", raddr)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145
	c, err = d.Dial(s.Context(), "tcp", peer)
146 147 148 149
	if err != nil {
		return nil, err
	}

150 151
	c, err = s.connSetup(c)
	if err != nil {
152 153 154 155 156 157 158
		c.Close()
		return nil, err
	}

	return c, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
// GetConnection returns the connection in the swarm to given peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
func (s *Swarm) GetConnection(pid peer.ID) conn.Conn {
161
	s.connsLock.RLock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162
	c, found := s.conns[u.Key(pid)]
163 164 165 166 167
	s.connsLock.RUnlock()

	if !found {
		return nil
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168
	return c
169 170
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171 172 173 174 175 176 177 178 179 180 181 182 183
// Connections returns a slice of all connections.
func (s *Swarm) Connections() []conn.Conn {
	s.connsLock.RLock()

	conns := make([]conn.Conn, 0, len(s.conns))
	for _, c := range s.conns {
		conns = append(conns, c)
	}

	s.connsLock.RUnlock()
	return conns
}

184
// CloseConnection removes a given peer from swarm + closes the connection
185 186
func (s *Swarm) CloseConnection(p peer.Peer) error {
	c := s.GetConnection(p.ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187
	if c == nil {
188 189 190 191
		return u.ErrNotFound
	}

	s.connsLock.Lock()
192
	delete(s.conns, u.Key(p.ID()))
193 194
	s.connsLock.Unlock()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195
	return c.Close()
196 197 198 199 200 201 202 203 204 205 206
}

func (s *Swarm) Error(e error) {
	s.errChan <- e
}

// GetErrChan returns the errors chan.
func (s *Swarm) GetErrChan() chan error {
	return s.errChan
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207
// GetPeerList returns a copy of the set of peers swarm is connected to.
208 209
func (s *Swarm) GetPeerList() []peer.Peer {
	var out []peer.Peer
210
	s.connsLock.RLock()
Jeromy's avatar
Jeromy committed
211
	for _, p := range s.conns {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
		out = append(out, p.RemotePeer())
Jeromy's avatar
Jeromy committed
213
	}
214
	s.connsLock.RUnlock()
Jeromy's avatar
Jeromy committed
215 216
	return out
}