swarm.go 6.39 KB
Newer Older
1 2 3
package swarm

import (
4
	"fmt"
5 6 7
	"net"
	"sync"

8
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
9
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	ma "github.com/jbenet/go-multiaddr"
11
	ident "github.com/jbenet/go-ipfs/identify"
12
	proto "code.google.com/p/goprotobuf/proto"
13 14
)

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
15 16
// Message represents a packet of information sent to or received from a
// particular Peer.
17 18 19 20 21 22 23 24
type Message struct {
	// To or from, depending on direction.
	Peer *peer.Peer

	// Opaque data
	Data []byte
}

25
// Cleaner looking helper function to make a new message struct
26 27 28 29 30
func NewMessage(p *peer.Peer, data proto.Message) *Message {
	bytes,err := proto.Marshal(data)
	if err != nil {
		panic(err)
	}
31 32
	return &Message{
		Peer: p,
33
		Data: bytes,
34 35 36
	}
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37
// Chan is a swam channel, which provides duplex communication and errors.
38
type Chan struct {
39 40
	Outgoing chan *Message
	Incoming chan *Message
41 42 43 44
	Errors   chan error
	Close    chan bool
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
45
// NewChan constructs a Chan instance, with given buffer size bufsize.
46 47
func NewChan(bufsize int) *Chan {
	return &Chan{
48 49
		Outgoing: make(chan *Message, bufsize),
		Incoming: make(chan *Message, bufsize),
50
		Errors:   make(chan error, bufsize),
51 52 53 54
		Close:    make(chan bool, bufsize),
	}
}

55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// Contains a set of errors mapping to each of the swarms addresses
// that were listened on
type SwarmListenErr struct {
	Errors []error
}

func (se *SwarmListenErr) Error() string {
	if se == nil {
		return "<nil error>"
	}
	var out string
	for i,v := range se.Errors {
		if v != nil {
			out += fmt.Sprintf("%d: %s\n", i, v)
		}
	}
	return out
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
74 75 76 77
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
78
type Swarm struct {
79 80 81
	Chan      *Chan
	conns     ConnMap
	connsLock sync.RWMutex
Jeromy's avatar
Jeromy committed
82

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83
	local *peer.Peer
Jeromy's avatar
Jeromy committed
84
	listeners []net.Listener
85 86
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
87
// NewSwarm constructs a Swarm, with a Chan.
Jeromy's avatar
Jeromy committed
88
func NewSwarm(local *peer.Peer) *Swarm {
89 90 91
	s := &Swarm{
		Chan:  NewChan(10),
		conns: ConnMap{},
Jeromy's avatar
Jeromy committed
92
		local: local,
93 94 95 96 97
	}
	go s.fanOut()
	return s
}

Jeromy's avatar
Jeromy committed
98
// Open listeners for each network the swarm should listen on
99 100 101
func (s *Swarm) Listen() error {
	var ret_err *SwarmListenErr
	for i, addr := range s.local.Addresses {
Jeromy's avatar
Jeromy committed
102 103
		err := s.connListen(addr)
		if err != nil {
104
			if ret_err == nil {
105 106 107 108
				ret_err = new(SwarmListenErr)
				ret_err.Errors = make([]error, len(s.local.Addresses))
			}
			ret_err.Errors[i] = err
Jeromy's avatar
Jeromy committed
109 110 111
			u.PErr("Failed to listen on: %s [%s]", addr, err)
		}
	}
112 113 114 115
	if ret_err == nil {
		return nil
	}
	return ret_err
Jeromy's avatar
Jeromy committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129
}

// Listen for new connections on the given multiaddr
func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
	netstr, addr, err := maddr.DialArgs()
	if err != nil {
		return err
	}

	list, err := net.Listen(netstr, addr)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
130 131 132
	// NOTE: this may require a lock around it later. currently, only run on setup
	s.listeners = append(s.listeners, list)

Jeromy's avatar
Jeromy committed
133 134 135
	// Accept and handle new connections on this listener until it errors
	go func() {
		for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136
			nconn, err := list.Accept()
Jeromy's avatar
Jeromy committed
137
			if err != nil {
138 139
				e := fmt.Errorf("Failed to accept connection: %s - %s [%s]",
					netstr, addr, err)
140
				go func() {s.Chan.Errors <- e}()
Jeromy's avatar
Jeromy committed
141 142 143 144 145 146 147 148 149 150 151
				return
			}
			go s.handleNewConn(nconn)
		}
	}()

	return nil
}

// Handle getting ID from this peer and adding it into the map
func (s *Swarm) handleNewConn(nconn net.Conn) {
152
	p := new(peer.Peer)
153 154 155

	conn := &Conn{
		Peer: p,
156
		Addr: nil,
157 158 159 160
		Conn: nconn,
	}
	newConnChans(conn)

161 162 163 164 165
	err := ident.Handshake(s.local, p, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
	if err != nil {
		panic(err)
	}

Jeromy's avatar
Jeromy committed
166 167 168 169 170 171 172 173
	// Get address to contact remote peer from
	addr := <-conn.Incoming.MsgChan
	maddr, err := ma.NewMultiaddr(string(addr))
	if err != nil {
		u.PErr("Got invalid address from peer.")
	}
	p.AddAddress(maddr)

174
	s.StartConn(conn)
Jeromy's avatar
Jeromy committed
175 176 177
}

// Close closes a swarm.
178 179 180 181 182 183 184 185 186 187
func (s *Swarm) Close() {
	s.connsLock.RLock()
	l := len(s.conns)
	s.connsLock.RUnlock()

	for i := 0; i < l; i++ {
		s.Chan.Close <- true // fan ins
	}
	s.Chan.Close <- true // fan out
	s.Chan.Close <- true // listener
Jeromy's avatar
Jeromy committed
188 189 190 191

	for _,list := range s.listeners {
		list.Close()
	}
192 193
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
194 195 196 197 198 199 200 201
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
	k := peer.Key()

	// check if we already have an open connection first
	s.connsLock.RLock()
	conn, found := s.conns[k]
	s.connsLock.RUnlock()
	if found {
		return conn, nil
	}

	// open connection to peer
	conn, err := Dial("tcp", peer)
	if err != nil {
		return nil, err
	}

219
	s.StartConn(conn)
220 221 222
	return conn, nil
}

223 224 225 226 227
func (s *Swarm) StartConn(conn *Conn) {
	if conn == nil {
		panic("tried to start nil Conn!")
	}

228
	u.DOut("Starting connection: %s", conn.Peer.Key().Pretty())
229 230
	// add to conns
	s.connsLock.Lock()
231
	s.conns[conn.Peer.Key()] = conn
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
	s.connsLock.Unlock()

	// kick off reader goroutine
	go s.fanIn(conn)
}

// Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() {
	for {
		select {
		case <-s.Chan.Close:
			return // told to close.
		case msg, ok := <-s.Chan.Outgoing:
			if !ok {
				return
			}
Jeromy's avatar
Jeromy committed
248
			//u.DOut("fanOut: outgoing message for: '%s'", msg.Peer.Key().Pretty())
249 250 251 252

			s.connsLock.RLock()
			conn, found := s.conns[msg.Peer.Key()]
			s.connsLock.RUnlock()
253

254
			if !found {
255 256
				e := fmt.Errorf("Sent msg to peer without open conn: %v",
					msg.Peer)
257
				s.Chan.Errors <- e
258
				continue
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
			}

			// queue it in the connection's buffer
			conn.Outgoing.MsgChan <- msg.Data
		}
	}
}

// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
func (s *Swarm) fanIn(conn *Conn) {
	for {
		select {
		case <-s.Chan.Close:
			// close Conn.
			conn.Close()
275
			goto out
276 277

		case <-conn.Closed:
278
			goto out
279 280 281

		case data, ok := <-conn.Incoming.MsgChan:
			if !ok {
282
				e := fmt.Errorf("Error retrieving from conn: %v", conn.Peer.Key().Pretty())
283
				s.Chan.Errors <- e
284
				goto out
285 286 287
			}

			// wrap it for consumers.
288
			msg := &Message{Peer: conn.Peer, Data: data}
289 290 291
			s.Chan.Incoming <- msg
		}
	}
292
out:
293 294 295 296 297

	s.connsLock.Lock()
	delete(s.conns, conn.Peer.Key())
	s.connsLock.Unlock()
}
298

299 300 301 302 303 304
func (s *Swarm) Find(key u.Key) *peer.Peer {
	conn, found := s.conns[key]
	if !found {
		return nil
	}
	return conn.Peer
305
}