swarm.go 4.64 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
package swarm

import (
	"errors"
	"fmt"
	"net"
	"sync"

	conn "github.com/jbenet/go-ipfs/net/conn"
	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)

// ErrAlreadyOpen signals that a connection to a peer is already open.
var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.")

// ListenErr contains a set of errors mapping to each of the swarms addresses.
// Used to return multiple errors, as in listen.
type ListenErr struct {
	Errors []error
}

func (e *ListenErr) Error() string {
	if e == nil {
		return "<nil error>"
	}
	var out string
	for i, v := range e.Errors {
		if v != nil {
			out += fmt.Sprintf("%d: %s\n", i, v)
		}
	}
	return out
}

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
type Swarm struct {

	// local is the peer this swarm represents
	local *peer.Peer

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51
	// peers is a collection of peers for swarm to use
	peers peer.Peerstore

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
	// Swarm includes a Pipe object.
	*msg.Pipe

	// errChan is the channel of errors.
	errChan chan error

	// conns are the open connections the swarm is handling.
	conns     conn.Map
	connsLock sync.RWMutex

	// listeners for each network address
	listeners []net.Listener

	// cancel is an internal function used to stop the Swarm's processing.
	cancel context.CancelFunc
	ctx    context.Context
}

// NewSwarm constructs a Swarm, with a Chan.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
func NewSwarm(ctx context.Context, local *peer.Peer, ps peer.Peerstore) (*Swarm, error) {
72 73 74 75
	s := &Swarm{
		Pipe:    msg.NewPipe(10),
		conns:   conn.Map{},
		local:   local,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76
		peers:   ps,
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
		errChan: make(chan error, 100),
	}

	s.ctx, s.cancel = context.WithCancel(ctx)
	go s.fanOut()
	return s, s.listen()
}

// Close stops a swarm.
func (s *Swarm) Close() error {
	if s.cancel == nil {
		return errors.New("Swarm already closed.")
	}

	// issue cancel for the context
	s.cancel()

	// set cancel to nil to prevent calling Close again, and signal to Listeners
	s.cancel = nil

	// close listeners
	for _, list := range s.listeners {
		list.Close()
	}
	return nil
}

// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) {
	if peer.ID.Equal(s.local.ID) {
		return nil, errors.New("Attempted connection to self!")
	}

	// check if we already have an open connection first
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118
	c := s.GetConnection(peer.ID)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119 120 121 122 123 124 125 126 127
	if c != nil {
		return c, nil
	}

	// check if we don't have the peer in Peerstore
	err := s.peers.Put(peer)
	if err != nil {
		return nil, err
	}
128 129

	// open connection to peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130
	c, err = conn.Dial("tcp", peer)
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
	if err != nil {
		return nil, err
	}

	if err := s.connSetup(c); err != nil {
		c.Close()
		return nil, err
	}

	return c, nil
}

// DialAddr is for connecting to a peer when you know their addr but not their ID.
// Should only be used when sure that not connected to peer in question
// TODO(jbenet) merge with Dial? need way to patch back.
func (s *Swarm) DialAddr(addr *ma.Multiaddr) (*conn.Conn, error) {
	if addr == nil {
		return nil, errors.New("addr must be a non-nil Multiaddr")
	}

	npeer := new(peer.Peer)
	npeer.AddAddress(addr)

	c, err := conn.Dial("tcp", npeer)
	if err != nil {
		return nil, err
	}

	if err := s.connSetup(c); err != nil {
		c.Close()
		return nil, err
	}

	return c, err
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167 168
// GetConnection returns the connection in the swarm to given peer.ID
func (s *Swarm) GetConnection(pid peer.ID) *conn.Conn {
169
	s.connsLock.RLock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170
	c, found := s.conns[u.Key(pid)]
171 172 173 174 175
	s.connsLock.RUnlock()

	if !found {
		return nil
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176
	return c
177 178 179 180
}

// CloseConnection removes a given peer from swarm + closes the connection
func (s *Swarm) CloseConnection(p *peer.Peer) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181 182
	c := s.GetConnection(p.ID)
	if c == nil {
183 184 185 186 187 188 189
		return u.ErrNotFound
	}

	s.connsLock.Lock()
	delete(s.conns, u.Key(p.ID))
	s.connsLock.Unlock()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190
	return c.Close()
191 192 193 194 195 196 197 198 199 200 201 202 203
}

func (s *Swarm) Error(e error) {
	s.errChan <- e
}

// GetErrChan returns the errors chan.
func (s *Swarm) GetErrChan() chan error {
	return s.errChan
}

// Temporary to ensure that the Swarm always matches the Network interface as we are changing it
// var _ Network = &Swarm{}