dht.go 12.1 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"crypto/rand"
5
	"errors"
6
	"fmt"
7 8
	"sync"
	"time"
9

10
	inet "github.com/jbenet/go-ipfs/net"
11
	msg "github.com/jbenet/go-ipfs/net/message"
12
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
13
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
14
	u "github.com/jbenet/go-ipfs/util"
15

16
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
17
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
18
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
19

20
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
21 22
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24 25 26 27
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
28 29
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
30
	routingTables []*kb.RoutingTable
31

32
	// the network interface. service
33
	network inet.Network
34
	sender  inet.Sender
35

Jeromy's avatar
Jeromy committed
36 37 38 39 40
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
41
	dslock    sync.Mutex
42

43
	providers *ProviderManager
44

45 46
	// Signal to shutdown dht
	shutdown chan struct{}
47 48 49

	// When this peer started up
	birth time.Time
50 51 52

	//lock to make diagnostics work better
	diaglock sync.Mutex
53 54
}

Jeromy's avatar
Jeromy committed
55
// NewDHT creates a new DHT object with the given peer as the 'local' host
56
func NewDHT(p *peer.Peer, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
57
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
58
	dht.network = net
59
	dht.sender = sender
Jeromy's avatar
Jeromy committed
60
	dht.datastore = dstore
61
	dht.self = p
62

63
	dht.providers = NewProviderManager(p.ID)
Jeromy's avatar
Jeromy committed
64
	dht.shutdown = make(chan struct{})
65

66 67 68 69
	dht.routingTables = make([]*kb.RoutingTable, 3)
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
70
	dht.birth = time.Now()
Jeromy's avatar
Jeromy committed
71
	return dht
72 73
}

74
// Start up background goroutines needed by the DHT
75
func (dht *IpfsDHT) Start() {
76
	panic("the service is already started. rmv this method")
77 78
}

79
// Connect to a new peer at the given address, ping and add to the routing table
80
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
81
	maddrstr, _ := addr.String()
82
	u.DOut("Connect to new peer: %s\n", maddrstr)
83 84 85 86 87 88 89 90 91 92 93 94 95

	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
	npeer := &peer.Peer{}
	npeer.AddAddress(addr)
	err := dht.network.DialPeer(npeer)
96
	if err != nil {
97
		return nil, err
98 99
	}

Jeromy's avatar
Jeromy committed
100 101
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
102
	err = dht.Ping(npeer, time.Second*2)
Jeromy's avatar
Jeromy committed
103
	if err != nil {
104
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
105 106
	}

107 108
	dht.Update(npeer)

109
	return npeer, nil
Jeromy's avatar
Jeromy committed
110 111
}

112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {

	mData := mes.Data()
	if mData == nil {
		return nil, errors.New("message did not include Data")
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		return nil, errors.New("message did not include a Peer")
	}

	// deserialize msg
	pmes := new(Message)
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
	}

	// update the peer (on valid msgs only)
	dht.Update(mPeer)

	// Print out diagnostic
	u.DOut("[peer: %s]\nGot message type: '%s' [from = %s]\n",
		dht.self.ID.Pretty(),
		Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())

	// get handler for this msg type.
	var resp *Message
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
		return nil, errors.New("Recieved invalid message type")
	}

	// dispatch handler.
	rpmes, err := handler(mPeer, pmes)
	if err != nil {
		return nil, err
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
	}

	return rmes, nil
}

162 163 164
func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {
	pmes, err := dht.getValueSingle(p, key, timeout, level)
	if err != nil {
165
		return nil, nil, err
166 167 168 169 170 171 172 173 174 175 176 177 178
	}

	if pmes.GetSuccess() {
		if pmes.Value == nil { // We were given provider[s]
			val, err := dht.getFromPeerList(key, timeout, pmes.GetPeers(), level)
			if err != nil {
				return nil, nil, err
			}
			return val, nil, nil
		}

		// Success! We were given the value
		return pmes.GetValue(), nil, nil
179
	}
180

181 182 183 184 185 186 187 188
	// We were given a closer node
	var peers []*peer.Peer
	for _, pb := range pmes.GetPeers() {
		if peer.ID(pb.GetId()).Equal(dht.self.ID) {
			continue
		}
		addr, err := ma.NewMultiaddr(pb.GetAddr())
		if err != nil {
189
			u.PErr("%v\n", err.Error())
190 191
			continue
		}
192

193 194
		np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
		if err != nil {
195
			u.PErr("%v\n", err.Error())
196
			continue
197
		}
198 199

		peers = append(peers, np)
200
	}
201
	return nil, peers, nil
202 203
}

204
// getValueSingle simply performs the get value RPC with the given parameters
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205
func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*Message, error) {
206
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207
		Type:  Message_GET_VALUE,
208 209
		Key:   string(key),
		Value: []byte{byte(level)},
210
		ID:    swarm.GenerateMessageID(),
Jeromy's avatar
Jeromy committed
211
	}
212
	responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
Jeromy's avatar
Jeromy committed
213 214

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
215
	t := time.Now()
216
	dht.netChan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
217 218 219 220 221

	// Wait for either the response or a timeout
	timeup := time.After(timeout)
	select {
	case <-timeup:
222
		dht.listener.Unlisten(pmes.ID)
Jeromy's avatar
Jeromy committed
223
		return nil, u.ErrTimeout
224
	case resp, ok := <-responseChan:
Jeromy's avatar
Jeromy committed
225
		if !ok {
Jeromy's avatar
Jeromy committed
226
			u.PErr("response channel closed before timeout, please investigate.\n")
Jeromy's avatar
Jeromy committed
227 228
			return nil, u.ErrTimeout
		}
229 230
		roundtrip := time.Since(t)
		resp.Peer.SetLatency(roundtrip)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
231
		pmesOut := new(Message)
232
		err := proto.Unmarshal(resp.Data, pmesOut)
Jeromy's avatar
Jeromy committed
233 234 235
		if err != nil {
			return nil, err
		}
236
		return pmesOut, nil
Jeromy's avatar
Jeromy committed
237 238 239
	}
}

240 241 242 243 244
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
245
	peerlist []*Message_PBPeer, level int) ([]byte, error) {
246 247 248 249
	for _, pinfo := range peerlist {
		p, _ := dht.Find(peer.ID(pinfo.GetId()))
		if p == nil {
			maddr, err := ma.NewMultiaddr(pinfo.GetAddr())
Jeromy's avatar
Jeromy committed
250
			if err != nil {
251
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
252 253
				continue
			}
254

255
			p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr)
Jeromy's avatar
Jeromy committed
256
			if err != nil {
257
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
258 259 260
				continue
			}
		}
261
		pmes, err := dht.getValueSingle(p, key, timeout, level)
Jeromy's avatar
Jeromy committed
262
		if err != nil {
263
			u.DErr("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
264 265
			continue
		}
266
		dht.providers.AddProvider(key, p)
Jeromy's avatar
Jeromy committed
267

268 269 270 271
		// Make sure it was a successful get
		if pmes.GetSuccess() && pmes.Value != nil {
			return pmes.GetValue(), nil
		}
Jeromy's avatar
Jeromy committed
272 273 274 275
	}
	return nil, u.ErrNotFound
}

276
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
277 278
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
279
	v, err := dht.datastore.Get(ds.NewKey(string(key)))
280 281 282 283 284 285
	if err != nil {
		return nil, err
	}
	return v.([]byte), nil
}

286
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
287 288
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}
289

290
// Update TODO(chas) Document this function
291
func (dht *IpfsDHT) Update(p *peer.Peer) {
292
	for _, route := range dht.routingTables {
293
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
294
		// Only close the connection if no tables refer to this peer
295 296
		if removed != nil {
			found := false
297
			for _, r := range dht.routingTables {
298 299 300 301 302 303
				if r.Find(removed.ID) != nil {
					found = true
					break
				}
			}
			if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
304
				dht.network.CloseConnection(removed)
305 306
			}
		}
307 308
	}
}
Jeromy's avatar
Jeromy committed
309

310
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
311
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
312
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
313 314 315 316 317 318 319
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
320

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
321
func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*Message, error) {
322
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
323
		Type:  Message_FIND_NODE,
324
		Key:   string(id),
325
		ID:    swarm.GenerateMessageID(),
326 327 328 329
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
330
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
331
	t := time.Now()
332
	dht.netChan.Outgoing <- mes
333 334 335
	after := time.After(timeout)
	select {
	case <-after:
336
		dht.listener.Unlisten(pmes.ID)
337 338
		return nil, u.ErrTimeout
	case resp := <-listenChan:
339 340
		roundtrip := time.Since(t)
		resp.Peer.SetLatency(roundtrip)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
341
		pmesOut := new(Message)
342
		err := proto.Unmarshal(resp.Data, pmesOut)
343 344 345 346
		if err != nil {
			return nil, err
		}

347
		return pmesOut, nil
348 349
	}
}
350

351 352
func (dht *IpfsDHT) printTables() {
	for _, route := range dht.routingTables {
353 354 355
		route.Print()
	}
}
Jeromy's avatar
Jeromy committed
356

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
357
func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*Message, error) {
358
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
359
		Type:  Message_GET_PROVIDERS,
Jeromy's avatar
Jeromy committed
360
		Key:   string(key),
361
		ID:    swarm.GenerateMessageID(),
Jeromy's avatar
Jeromy committed
362 363 364 365 366
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

367
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
368
	dht.netChan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
369 370 371
	after := time.After(timeout)
	select {
	case <-after:
372
		dht.listener.Unlisten(pmes.ID)
Jeromy's avatar
Jeromy committed
373 374
		return nil, u.ErrTimeout
	case resp := <-listenChan:
Jeromy's avatar
Jeromy committed
375
		u.DOut("FindProviders: got response.\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
376
		pmesOut := new(Message)
377
		err := proto.Unmarshal(resp.Data, pmesOut)
Jeromy's avatar
Jeromy committed
378 379 380 381
		if err != nil {
			return nil, err
		}

382
		return pmesOut, nil
Jeromy's avatar
Jeromy committed
383 384 385
	}
}

386
// TODO: Could be done async
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
387
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*Message_PBPeer) []*peer.Peer {
388
	var provArr []*peer.Peer
Jeromy's avatar
Jeromy committed
389 390 391 392 393 394
	for _, prov := range peers {
		// Dont add outselves to the list
		if peer.ID(prov.GetId()).Equal(dht.self.ID) {
			continue
		}
		// Dont add someone who is already on the list
395
		p := dht.network.GetPeer(u.Key(prov.GetId()))
Jeromy's avatar
Jeromy committed
396
		if p == nil {
397
			u.DOut("given provider %s was not in our network already.\n", peer.ID(prov.GetId()).Pretty())
Jeromy's avatar
Jeromy committed
398 399
			var err error
			p, err = dht.peerFromInfo(prov)
Jeromy's avatar
Jeromy committed
400
			if err != nil {
401
				u.PErr("error connecting to new peer: %s\n", err)
Jeromy's avatar
Jeromy committed
402 403 404
				continue
			}
		}
405
		dht.providers.AddProvider(key, p)
406
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
407
	}
408
	return provArr
Jeromy's avatar
Jeromy committed
409
}
Jeromy's avatar
Jeromy committed
410

411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
// nearestPeerToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeerToQuery(pmes *Message) *peer.Peer {
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
	closer := cluster.NearestPeer(kb.ConvertKey(key))
	return closer
}

// betterPeerToQuery returns nearestPeerToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
	closer := dht.nearestPeerToQuery(pmes)

	// no node? nil
	if closer == nil {
		return nil
	}

	// == to self? nil
	if closer.ID.Equal(dht.self.ID) {
		u.DOut("Attempted to return self! this shouldnt happen...\n")
		return nil
	}

	// self is closer? nil
	if kb.Closer(dht.self.ID, closer.ID, key) {
		return nil
	}

	// ok seems like a closer node.
	return closer
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
445
func (dht *IpfsDHT) peerFromInfo(pbp *Message_PBPeer) (*peer.Peer, error) {
Jeromy's avatar
Jeromy committed
446 447 448 449 450 451 452
	maddr, err := ma.NewMultiaddr(pbp.GetAddr())
	if err != nil {
		return nil, err
	}

	return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
}
453 454

func (dht *IpfsDHT) loadProvidableKeys() error {
455 456 457 458
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
459 460 461 462 463 464 465 466 467 468 469 470
	for _, k := range kl {
		dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
	}
	return nil
}

// Builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap() {
	id := make([]byte, 16)
	rand.Read(id)
	dht.FindPeer(peer.ID(id), time.Second*10)
}