swarm.go 5.14 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2
// package swarm implements a connection muxer with a pair of channels
// to synchronize all network communication.
3 4 5 6 7
package swarm

import (
	"errors"
	"fmt"
Jeromy's avatar
Jeromy committed
8
	"strings"
9 10 11 12 13 14
	"sync"

	conn "github.com/jbenet/go-ipfs/net/conn"
	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
16
	"github.com/jbenet/go-ipfs/util/elog"
17 18

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
19
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
20 21
)

22
var log = eventlog.Logger("swarm")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23

24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
// ErrAlreadyOpen signals that a connection to a peer is already open.
var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.")

// ListenErr contains a set of errors mapping to each of the swarms addresses.
// Used to return multiple errors, as in listen.
type ListenErr struct {
	Errors []error
}

func (e *ListenErr) Error() string {
	if e == nil {
		return "<nil error>"
	}
	var out string
	for i, v := range e.Errors {
		if v != nil {
			out += fmt.Sprintf("%d: %s\n", i, v)
		}
	}
	return out
}

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
type Swarm struct {

	// local is the peer this swarm represents
53
	local peer.Peer
54

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56 57
	// peers is a collection of peers for swarm to use
	peers peer.Peerstore

58 59 60 61 62 63 64
	// Swarm includes a Pipe object.
	*msg.Pipe

	// errChan is the channel of errors.
	errChan chan error

	// conns are the open connections the swarm is handling.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65 66
	// these are MultiConns, which multiplex multiple separate underlying Conns.
	conns     conn.MultiConnMap
67 68 69
	connsLock sync.RWMutex

	// listeners for each network address
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
	listeners []conn.Listener
71

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
	// ContextCloser
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
	ctxc.ContextCloser
74 75 76
}

// NewSwarm constructs a Swarm, with a Chan.
77
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer, ps peer.Peerstore) (*Swarm, error) {
78 79
	s := &Swarm{
		Pipe:    msg.NewPipe(10),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80
		conns:   conn.MultiConnMap{},
81
		local:   local,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82
		peers:   ps,
83 84 85
		errChan: make(chan error, 100),
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
	// ContextCloser for proper child management.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87
	s.ContextCloser = ctxc.NewContextCloser(ctx, s.close)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
89
	s.Children().Add(1)
90
	go s.fanOut()
91
	return s, s.listen(listenAddrs)
92 93
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94 95
// close stops a swarm. It's the underlying function called by ContextCloser
func (s *Swarm) close() error {
96 97 98 99
	// close listeners
	for _, list := range s.listeners {
		list.Close()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100 101
	// close connections
	conn.CloseConns(s.Connections()...)
102 103 104 105 106 107 108 109 110 111 112
	return nil
}

// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
113 114
func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
	if peer.ID().Equal(s.local.ID()) {
115 116 117 118
		return nil, errors.New("Attempted connection to self!")
	}

	// check if we already have an open connection first
119
	c := s.GetConnection(peer.ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121 122 123 124
	if c != nil {
		return c, nil
	}

	// check if we don't have the peer in Peerstore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125
	peer, err := s.peers.Add(peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126 127 128
	if err != nil {
		return nil, err
	}
129 130

	// open connection to peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131 132 133 134 135
	d := &conn.Dialer{
		LocalPeer: s.local,
		Peerstore: s.peers,
	}

Jeromy's avatar
Jeromy committed
136
	// If we are attempting to connect to the zero addr, fail out early
Jeromy's avatar
Jeromy committed
137 138 139 140 141 142 143 144 145
	raddr := peer.NetAddress("tcp")
	if raddr == nil {
		return nil, fmt.Errorf("No remote address for network tcp")
	}

	if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
		return nil, fmt.Errorf("Attempted to connect to loopback address: %s", raddr)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146
	c, err = d.Dial(s.Context(), "tcp", peer)
147 148 149 150
	if err != nil {
		return nil, err
	}

151 152
	c, err = s.connSetup(c)
	if err != nil {
153 154 155 156 157 158 159
		c.Close()
		return nil, err
	}

	return c, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
// GetConnection returns the connection in the swarm to given peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161
func (s *Swarm) GetConnection(pid peer.ID) conn.Conn {
162
	s.connsLock.RLock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
	c, found := s.conns[u.Key(pid)]
164 165 166 167 168
	s.connsLock.RUnlock()

	if !found {
		return nil
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169
	return c
170 171
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172 173 174 175 176 177 178 179 180 181 182 183 184
// Connections returns a slice of all connections.
func (s *Swarm) Connections() []conn.Conn {
	s.connsLock.RLock()

	conns := make([]conn.Conn, 0, len(s.conns))
	for _, c := range s.conns {
		conns = append(conns, c)
	}

	s.connsLock.RUnlock()
	return conns
}

185
// CloseConnection removes a given peer from swarm + closes the connection
186 187
func (s *Swarm) CloseConnection(p peer.Peer) error {
	c := s.GetConnection(p.ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188
	if c == nil {
189 190 191 192
		return u.ErrNotFound
	}

	s.connsLock.Lock()
193
	delete(s.conns, u.Key(p.ID()))
194 195
	s.connsLock.Unlock()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
196
	return c.Close()
197 198 199 200 201 202 203 204 205 206 207
}

func (s *Swarm) Error(e error) {
	s.errChan <- e
}

// GetErrChan returns the errors chan.
func (s *Swarm) GetErrChan() chan error {
	return s.errChan
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208
// GetPeerList returns a copy of the set of peers swarm is connected to.
209 210
func (s *Swarm) GetPeerList() []peer.Peer {
	var out []peer.Peer
211
	s.connsLock.RLock()
Jeromy's avatar
Jeromy committed
212
	for _, p := range s.conns {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213
		out = append(out, p.RemotePeer())
Jeromy's avatar
Jeromy committed
214
	}
215
	s.connsLock.RUnlock()
Jeromy's avatar
Jeromy committed
216 217
	return out
}