swarm.go 4.41 KB
Newer Older
1 2 3
package swarm

import (
4 5
	"fmt"
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
6
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
8
	"net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	"sync"
10 11
)

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
12 13
// Message represents a packet of information sent to or received from a
// particular Peer.
14 15 16 17 18 19 20 21
type Message struct {
	// To or from, depending on direction.
	Peer *peer.Peer

	// Opaque data
	Data []byte
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
22
// Chan is a swam channel, which provides duplex communication and errors.
23 24 25 26 27 28 29
type Chan struct {
	Outgoing chan Message
	Incoming chan Message
	Errors   chan error
	Close    chan bool
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
30
// NewChan constructs a Chan instance, with given buffer size bufsize.
31 32 33 34 35 36 37 38 39
func NewChan(bufsize int) *Chan {
	return &Chan{
		Outgoing: make(chan Message, bufsize),
		Incoming: make(chan Message, bufsize),
		Errors:   make(chan error),
		Close:    make(chan bool, bufsize),
	}
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
40 41 42 43
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
44
type Swarm struct {
45 46 47
	Chan      *Chan
	conns     ConnMap
	connsLock sync.RWMutex
Jeromy's avatar
Jeromy committed
48

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49
	local *peer.Peer
50 51
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
52
// NewSwarm constructs a Swarm, with a Chan.
Jeromy's avatar
Jeromy committed
53
func NewSwarm(local *peer.Peer) *Swarm {
54 55 56
	s := &Swarm{
		Chan:  NewChan(10),
		conns: ConnMap{},
Jeromy's avatar
Jeromy committed
57
		local: local,
58 59 60 61 62
	}
	go s.fanOut()
	return s
}

Jeromy's avatar
Jeromy committed
63 64
// Open listeners for each network the swarm should listen on
func (s *Swarm) Listen() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65
	for _, addr := range s.local.Addresses {
Jeromy's avatar
Jeromy committed
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
		err := s.connListen(addr)
		if err != nil {
			u.PErr("Failed to listen on: %s [%s]", addr, err)
		}
	}
}

// Listen for new connections on the given multiaddr
func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
	netstr, addr, err := maddr.DialArgs()
	if err != nil {
		return err
	}

	list, err := net.Listen(netstr, addr)
	if err != nil {
		return err
	}

	// Accept and handle new connections on this listener until it errors
	go func() {
		for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
			nconn, err := list.Accept()
Jeromy's avatar
Jeromy committed
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
			if err != nil {
				u.PErr("Failed to accept connection: %s - %s", netstr, addr)
				return
			}
			go s.handleNewConn(nconn)
		}
	}()

	return nil
}

// Handle getting ID from this peer and adding it into the map
func (s *Swarm) handleNewConn(nconn net.Conn) {
	panic("Not yet implemented!")
}

// Close closes a swarm.
106 107 108 109 110 111 112 113 114 115 116 117
func (s *Swarm) Close() {
	s.connsLock.RLock()
	l := len(s.conns)
	s.connsLock.RUnlock()

	for i := 0; i < l; i++ {
		s.Chan.Close <- true // fan ins
	}
	s.Chan.Close <- true // fan out
	s.Chan.Close <- true // listener
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
118 119 120 121 122 123 124 125
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
	k := peer.Key()

	// check if we already have an open connection first
	s.connsLock.RLock()
	conn, found := s.conns[k]
	s.connsLock.RUnlock()
	if found {
		return conn, nil
	}

	// open connection to peer
	conn, err := Dial("tcp", peer)
	if err != nil {
		return nil, err
	}

	// add to conns
	s.connsLock.Lock()
	s.conns[k] = conn
	s.connsLock.Unlock()

	// kick off reader goroutine
	go s.fanIn(conn)
	return conn, nil
}

// Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() {
	for {
		select {
		case <-s.Chan.Close:
			return // told to close.
		case msg, ok := <-s.Chan.Outgoing:
			if !ok {
				return
			}

			s.connsLock.RLock()
			conn, found := s.conns[msg.Peer.Key()]
			s.connsLock.RUnlock()
			if !found {
				e := fmt.Errorf("Sent msg to peer without open conn: %v", msg.Peer)
				s.Chan.Errors <- e
			}

			// queue it in the connection's buffer
			conn.Outgoing.MsgChan <- msg.Data
		}
	}
}

// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
func (s *Swarm) fanIn(conn *Conn) {
Loop:
	for {
		select {
		case <-s.Chan.Close:
			// close Conn.
			conn.Close()
			break Loop

		case <-conn.Closed:
			break Loop

		case data, ok := <-conn.Incoming.MsgChan:
			if !ok {
				e := fmt.Errorf("Error retrieving from conn: %v", conn)
				s.Chan.Errors <- e
				break Loop
			}

			// wrap it for consumers.
			msg := Message{Peer: conn.Peer, Data: data}
			s.Chan.Incoming <- msg
		}
	}

	s.connsLock.Lock()
	delete(s.conns, conn.Peer.Key())
	s.connsLock.Unlock()
}