net.go 7.15 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
// Package net provides an interface for ipfs to interact with the network through
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package net

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6
	"fmt"

7
	ic "github.com/jbenet/go-ipfs/crypto"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
	swarm "github.com/jbenet/go-ipfs/net/swarm"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9 10
	peer "github.com/jbenet/go-ipfs/peer"

11
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
12
	ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
13
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14 15
)

16
type stream swarm.Stream
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17

18 19 20
func (s *stream) SwarmStream() *swarm.Stream {
	return (*swarm.Stream)(s)
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21

22 23 24 25 26
// Conn returns the connection this stream is part of.
func (s *stream) Conn() Conn {
	c := s.SwarmStream().Conn()
	return (*conn_)(c)
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27

28 29 30 31
// Conn returns the connection this stream is part of.
func (s *stream) Close() error {
	return s.SwarmStream().Close()
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

33 34 35
// Read reads bytes from a stream.
func (s *stream) Read(p []byte) (n int, err error) {
	return s.SwarmStream().Read(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36 37
}

38 39 40 41
// Write writes bytes to a stream, flushing for each call.
func (s *stream) Write(p []byte) (n int, err error) {
	return s.SwarmStream().Write(p)
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42

43
type conn_ swarm.Conn
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44

45 46 47 48
func (s *conn_) String() string {
	return s.SwarmConn().String()
}

49 50 51 52
func (c *conn_) SwarmConn() *swarm.Conn {
	return (*swarm.Conn)(c)
}

53
func (c *conn_) NewStreamWithProtocol(pr ProtocolID) (Stream, error) {
54
	s, err := (*swarm.Conn)(c).NewStream()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56 57
	if err != nil {
		return nil, err
	}
58 59 60

	ss := (*stream)(s)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61
	if err := WriteProtocolHeader(pr, ss); err != nil {
62 63 64 65 66
		ss.Close()
		return nil, err
	}

	return ss, nil
67
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68

69 70 71
func (c *conn_) LocalMultiaddr() ma.Multiaddr {
	return c.SwarmConn().LocalMultiaddr()
}
72

73 74
func (c *conn_) RemoteMultiaddr() ma.Multiaddr {
	return c.SwarmConn().RemoteMultiaddr()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75 76
}

77 78 79 80 81
func (c *conn_) LocalPeer() peer.ID {
	return c.SwarmConn().LocalPeer()
}

func (c *conn_) RemotePeer() peer.ID {
82 83 84
	return c.SwarmConn().RemotePeer()
}

85 86 87 88 89 90 91 92
func (c *conn_) LocalPrivateKey() ic.PrivKey {
	return c.SwarmConn().LocalPrivateKey()
}

func (c *conn_) RemotePublicKey() ic.PubKey {
	return c.SwarmConn().RemotePublicKey()
}

93 94
// network implements the Network interface,
type network struct {
95
	local peer.ID      // local peer
96 97
	mux   Mux          // protocol multiplexing
	swarm *swarm.Swarm // peer connection multiplexing
98 99
	ps    peer.Peerstore
	ids   *IDService
100 101 102 103

	cg ctxgroup.ContextGroup // for Context closing
}

104
// NewNetwork constructs a new network and starts listening on given addresses.
105
func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID,
106
	peers peer.Peerstore) (Network, error) {
107 108 109 110 111 112 113 114 115

	s, err := swarm.NewSwarm(ctx, listen, local, peers)
	if err != nil {
		return nil, err
	}

	n := &network{
		local: local,
		swarm: s,
116
		mux:   Mux{Handlers: StreamHandlerMap{}},
117
		cg:    ctxgroup.WithContext(ctx),
118
		ps:    peers,
119 120
	}

121 122 123
	n.cg.SetTeardown(n.close)
	n.cg.AddChildGroup(s.CtxGroup())

124
	s.SetStreamHandler(func(s *swarm.Stream) {
125
		n.mux.Handle((*stream)(s))
126 127
	})

128 129 130 131 132
	// setup a conn handler that immediately "asks the other side about them"
	// this is ProtocolIdentify.
	n.ids = NewIDService(n)
	s.SetConnHandler(n.newConnHandler)

133 134
	return n, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135

136 137
func (n *network) newConnHandler(c *swarm.Conn) {
	cc := (*conn_)(c)
138
	n.ids.IdentifyConn(cc)
139 140
}

141 142
// DialPeer attempts to establish a connection to a given peer.
// Respects the context.
143
func (n *network) DialPeer(ctx context.Context, p peer.ID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144
	log.Debugf("[%s] network dialing peer [%s]", n.local, p)
145 146 147 148 149 150
	sc, err := n.swarm.Dial(ctx, p)
	if err != nil {
		return err
	}

	// identify the connection before returning.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151 152 153 154 155 156 157 158 159 160 161 162 163
	done := make(chan struct{})
	go func() {
		n.ids.IdentifyConn((*conn_)(sc))
		close(done)
	}()

	// respect don contexteone
	select {
	case <-done:
	case <-ctx.Done():
		return ctx.Err()
	}

164 165
	log.Debugf("network for %s finished dialing %s", n.local, p)
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166 167
}

168 169 170 171
func (n *network) Protocols() []ProtocolID {
	return n.mux.Protocols()
}

172 173 174 175 176 177 178 179 180 181
// CtxGroup returns the network's ContextGroup
func (n *network) CtxGroup() ctxgroup.ContextGroup {
	return n.cg
}

// Swarm returns the network's peerstream.Swarm
func (n *network) Swarm() *swarm.Swarm {
	return n.Swarm()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
182
// LocalPeer the network's LocalPeer
183
func (n *network) LocalPeer() peer.ID {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
184 185 186
	return n.swarm.LocalPeer()
}

187
// Peers returns the connected peers
188
func (n *network) Peers() []peer.ID {
189 190 191
	return n.swarm.Peers()
}

192 193 194 195 196
// Peers returns the connected peers
func (n *network) Peerstore() peer.Peerstore {
	return n.ps
}

197 198 199 200 201 202 203 204 205 206
// Conns returns the connected peers
func (n *network) Conns() []Conn {
	conns1 := n.swarm.Connections()
	out := make([]Conn, len(conns1))
	for i, c := range conns1 {
		out[i] = (*conn_)(c)
	}
	return out
}

207 208 209 210 211 212 213 214 215 216
// ConnsToPeer returns the connections in this Netowrk for given peer.
func (n *network) ConnsToPeer(p peer.ID) []Conn {
	conns1 := n.swarm.ConnectionsToPeer(p)
	out := make([]Conn, len(conns1))
	for i, c := range conns1 {
		out[i] = (*conn_)(c)
	}
	return out
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
217
// ClosePeer connection to peer
218
func (n *network) ClosePeer(p peer.ID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219 220 221
	return n.swarm.CloseConnection(p)
}

222 223 224 225 226 227 228 229 230 231
// close is the real teardown function
func (n *network) close() error {
	return n.swarm.Close()
}

// Close calls the ContextCloser func
func (n *network) Close() error {
	return n.cg.Close()
}

232 233 234 235 236
// BandwidthTotals returns the total amount of bandwidth transferred
func (n *network) BandwidthTotals() (in uint64, out uint64) {
	// need to implement this. probably best to do it in swarm this time.
	// need a "metrics" object
	return 0, 0
Jeromy's avatar
Jeromy committed
237 238
}

239
// ListenAddresses returns a list of addresses at which this network listens.
240
func (n *network) ListenAddresses() []ma.Multiaddr {
241 242 243 244 245 246
	return n.swarm.ListenAddresses()
}

// InterfaceListenAddresses returns a list of addresses at which this network
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
247 248
func (n *network) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
	return swarm.InterfaceListenAddresses(n.swarm)
249
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
250 251

// Connectedness returns a state signaling connection capabilities
Brian Tiger Chow's avatar
Brian Tiger Chow committed
252
// For now only returns Connected || NotConnected. Expand into more later.
253
func (n *network) Connectedness(p peer.ID) Connectedness {
254
	c := n.swarm.ConnectionsToPeer(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
255
	if c != nil && len(c) > 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
256 257 258 259
		return Connected
	}
	return NotConnected
}
260

261 262 263
// NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one.
// If ProtocolID is "", writes no header.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264 265 266
func (n *network) NewStream(pr ProtocolID, p peer.ID) (Stream, error) {
	log.Debugf("[%s] network opening stream to peer [%s]: %s", n.local, p, pr)
	s, err := n.swarm.NewStreamWithPeer(p)
267 268 269 270 271 272
	if err != nil {
		return nil, err
	}

	ss := (*stream)(s)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
273
	if err := WriteProtocolHeader(pr, ss); err != nil {
274 275 276 277 278 279 280
		ss.Close()
		return nil, err
	}

	return ss, nil
}

281 282 283 284 285
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
func (n *network) SetHandler(p ProtocolID, h StreamHandler) {
	n.mux.SetHandler(p, h)
}
286

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
287 288 289 290
func (n *network) String() string {
	return fmt.Sprintf("<Network %s>", n.LocalPeer())
}

291 292 293 294
func (n *network) IdentifyProtocol() *IDService {
	return n.ids
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
295
func WriteProtocolHeader(pr ProtocolID, s Stream) error {
296 297 298 299 300 301 302
	if pr != "" { // only write proper protocol headers
		if err := WriteLengthPrefix(s, string(pr)); err != nil {
			return err
		}
	}
	return nil
}