swarm.go 4.82 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2
// package swarm implements a connection muxer with a pair of channels
// to synchronize all network communication.
3 4 5 6 7 8 9 10 11 12 13
package swarm

import (
	"errors"
	"fmt"
	"sync"

	conn "github.com/jbenet/go-ipfs/net/conn"
	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15
	"github.com/jbenet/go-ipfs/util/eventlog"
16 17

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
18
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
19 20
)

21
var log = eventlog.Logger("swarm")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22

23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
// ErrAlreadyOpen signals that a connection to a peer is already open.
var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.")

// ListenErr contains a set of errors mapping to each of the swarms addresses.
// Used to return multiple errors, as in listen.
type ListenErr struct {
	Errors []error
}

func (e *ListenErr) Error() string {
	if e == nil {
		return "<nil error>"
	}
	var out string
	for i, v := range e.Errors {
		if v != nil {
			out += fmt.Sprintf("%d: %s\n", i, v)
		}
	}
	return out
}

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
type Swarm struct {

	// local is the peer this swarm represents
52
	local peer.Peer
53

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54 55 56
	// peers is a collection of peers for swarm to use
	peers peer.Peerstore

57 58 59 60 61 62 63
	// Swarm includes a Pipe object.
	*msg.Pipe

	// errChan is the channel of errors.
	errChan chan error

	// conns are the open connections the swarm is handling.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65
	// these are MultiConns, which multiplex multiple separate underlying Conns.
	conns     conn.MultiConnMap
66 67 68
	connsLock sync.RWMutex

	// listeners for each network address
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
	listeners []conn.Listener
70

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
	// ContextCloser
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
	ctxc.ContextCloser
73 74 75
}

// NewSwarm constructs a Swarm, with a Chan.
76
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer, ps peer.Peerstore) (*Swarm, error) {
77 78
	s := &Swarm{
		Pipe:    msg.NewPipe(10),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
		conns:   conn.MultiConnMap{},
80
		local:   local,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
		peers:   ps,
82 83 84
		errChan: make(chan error, 100),
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
85
	// ContextCloser for proper child management.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
	s.ContextCloser = ctxc.NewContextCloser(ctx, s.close)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
	s.Children().Add(1)
89
	go s.fanOut()
90
	return s, s.listen(listenAddrs)
91 92
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93 94
// close stops a swarm. It's the underlying function called by ContextCloser
func (s *Swarm) close() error {
95 96 97 98
	// close listeners
	for _, list := range s.listeners {
		list.Close()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99 100
	// close connections
	conn.CloseConns(s.Connections()...)
101 102 103 104 105 106 107 108 109 110 111
	return nil
}

// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
112 113
func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
	if peer.ID().Equal(s.local.ID()) {
114 115 116 117
		return nil, errors.New("Attempted connection to self!")
	}

	// check if we already have an open connection first
118
	c := s.GetConnection(peer.ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119 120 121 122 123
	if c != nil {
		return c, nil
	}

	// check if we don't have the peer in Peerstore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
124
	peer, err := s.peers.Add(peer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125 126 127
	if err != nil {
		return nil, err
	}
128 129

	// open connection to peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130 131 132 133 134
	d := &conn.Dialer{
		LocalPeer: s.local,
		Peerstore: s.peers,
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135
	c, err = d.Dial(s.Context(), "tcp", peer)
136 137 138 139
	if err != nil {
		return nil, err
	}

140 141
	c, err = s.connSetup(c)
	if err != nil {
142 143 144 145 146 147 148
		c.Close()
		return nil, err
	}

	return c, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149
// GetConnection returns the connection in the swarm to given peer.ID
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150
func (s *Swarm) GetConnection(pid peer.ID) conn.Conn {
151
	s.connsLock.RLock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152
	c, found := s.conns[u.Key(pid)]
153 154 155 156 157
	s.connsLock.RUnlock()

	if !found {
		return nil
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
	return c
159 160
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161 162 163 164 165 166 167 168 169 170 171 172 173
// Connections returns a slice of all connections.
func (s *Swarm) Connections() []conn.Conn {
	s.connsLock.RLock()

	conns := make([]conn.Conn, 0, len(s.conns))
	for _, c := range s.conns {
		conns = append(conns, c)
	}

	s.connsLock.RUnlock()
	return conns
}

174
// CloseConnection removes a given peer from swarm + closes the connection
175 176
func (s *Swarm) CloseConnection(p peer.Peer) error {
	c := s.GetConnection(p.ID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177
	if c == nil {
178 179 180 181
		return u.ErrNotFound
	}

	s.connsLock.Lock()
182
	delete(s.conns, u.Key(p.ID()))
183 184
	s.connsLock.Unlock()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185
	return c.Close()
186 187 188 189 190 191 192 193 194 195 196
}

func (s *Swarm) Error(e error) {
	s.errChan <- e
}

// GetErrChan returns the errors chan.
func (s *Swarm) GetErrChan() chan error {
	return s.errChan
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197
// GetPeerList returns a copy of the set of peers swarm is connected to.
198 199
func (s *Swarm) GetPeerList() []peer.Peer {
	var out []peer.Peer
200
	s.connsLock.RLock()
Jeromy's avatar
Jeromy committed
201
	for _, p := range s.conns {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
		out = append(out, p.RemotePeer())
Jeromy's avatar
Jeromy committed
203
	}
204
	s.connsLock.RUnlock()
Jeromy's avatar
Jeromy committed
205 206
	return out
}