dht.go 15.5 KB
Newer Older
1 2
// package dht implements a distributed hash table that satisfies the ipfs routing
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4
package dht

5
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"bytes"
7
	"crypto/rand"
8
	"errors"
9
	"fmt"
10 11
	"sync"
	"time"
12

13
	inet "github.com/jbenet/go-ipfs/net"
14
	msg "github.com/jbenet/go-ipfs/net/message"
15
	peer "github.com/jbenet/go-ipfs/peer"
16
	routing "github.com/jbenet/go-ipfs/routing"
17
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
Jeromy's avatar
Jeromy committed
18
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
19
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
21
	"github.com/jbenet/go-ipfs/util/eventlog"
22

23
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
24
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
Jeromy's avatar
Jeromy committed
25

26
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
27 28
)

29
var log = eventlog.Logger("dht")
30

31
const doPinging = false
32

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33 34 35 36 37
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
38 39
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
40
	routingTables []*kb.RoutingTable
41

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42 43 44
	// the network services we need
	dialer inet.Dialer
	sender inet.Sender
45

Jeromy's avatar
Jeromy committed
46
	// Local peer (yourself)
47
	self peer.Peer
Jeromy's avatar
Jeromy committed
48

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51
	// Other peers
	peerstore peer.Peerstore

Jeromy's avatar
Jeromy committed
52 53
	// Local data
	datastore ds.Datastore
54
	dslock    sync.Mutex
55

56
	providers *ProviderManager
57

58 59
	// When this peer started up
	birth time.Time
60 61 62

	//lock to make diagnostics work better
	diaglock sync.Mutex
63

64 65 66
	// record validator funcs
	Validators map[string]ValidatorFunc

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
	ctxc.ContextCloser
68 69
}

Jeromy's avatar
Jeromy committed
70
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dialer, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
72
	dht := new(IpfsDHT)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
	dht.dialer = dialer
74
	dht.sender = sender
Jeromy's avatar
Jeromy committed
75
	dht.datastore = dstore
76
	dht.self = p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
	dht.peerstore = ps
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78
	dht.ContextCloser = ctxc.NewContextCloser(ctx, nil)
79

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81
	dht.providers = NewProviderManager(dht.Context(), p.ID())
	dht.AddCloserChild(dht.providers)
82

83
	dht.routingTables = make([]*kb.RoutingTable, 3)
84 85 86
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
87
	dht.birth = time.Now()
88

89
	dht.Validators = make(map[string]ValidatorFunc)
90
	dht.Validators["pk"] = ValidatePublicKeyRecord
91

92
	if doPinging {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93
		dht.Children().Add(1)
94 95
		go dht.PingRoutine(time.Second * 10)
	}
Jeromy's avatar
Jeromy committed
96
	return dht
97 98
}

99
// Connect to a new peer at the given address, ping and add to the routing table
100
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, error) {
101 102 103 104 105 106 107 108 109
	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
110
	err := dht.dialer.DialPeer(ctx, npeer)
111
	if err != nil {
112
		return nil, err
113 114
	}

Jeromy's avatar
Jeromy committed
115 116
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
	err = dht.Ping(ctx, npeer)
Jeromy's avatar
Jeromy committed
118
	if err != nil {
119
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
120
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
121
	log.Event(ctx, "connect", dht.self, npeer)
Jeromy's avatar
Jeromy committed
122

123
	dht.Update(ctx, npeer)
124

125
	return npeer, nil
Jeromy's avatar
Jeromy committed
126 127
}

128
// HandleMessage implements the inet.Handler interface.
129
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage {
130 131 132

	mData := mes.Data()
	if mData == nil {
133
		log.Error("Message contained nil data.")
134
		return nil
135 136 137 138
	}

	mPeer := mes.Peer()
	if mPeer == nil {
139
		log.Error("Message contained nil peer.")
140
		return nil
141 142 143
	}

	// deserialize msg
144
	pmes := new(pb.Message)
145 146
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
147
		log.Error("Error unmarshaling data")
148
		return nil
149 150 151
	}

	// update the peer (on valid msgs only)
152
	dht.Update(ctx, mPeer)
153

154
	log.Event(ctx, "foo", dht.self, mPeer, pmes)
155 156 157 158

	// get handler for this msg type.
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
159
		log.Error("got back nil handler from handlerForMsgType")
160
		return nil
161 162 163 164 165
	}

	// dispatch handler.
	rpmes, err := handler(mPeer, pmes)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166
		log.Errorf("handle message error: %s", err)
167
		return nil
168 169
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170 171
	// if nil response, return it before serializing
	if rpmes == nil {
172
		log.Warning("Got back nil response from request.")
173
		return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174 175
	}

176 177 178
	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
179
		log.Errorf("serialze response error: %s", err)
180
		return nil
181 182
	}

183
	return rmes
184 185
}

186 187
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
188
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
189 190 191 192 193 194 195 196

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

197
	log.Event(ctx, "sentMessage", dht.self, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198

199 200 201 202
	rmes, err := dht.sender.SendRequest(ctx, mes)
	if err != nil {
		return nil, err
	}
203 204 205
	if rmes == nil {
		return nil, errors.New("no response to request")
	}
206 207 208 209

	rtt := time.Since(start)
	rmes.Peer().SetLatency(rtt)

210
	rpmes := new(pb.Message)
211 212 213 214 215 216 217
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}

	return rpmes, nil
}

218
// putValueToNetwork stores the given key/value pair at the peer 'p'
219
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
220
	key string, rec *pb.Record) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221

222
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
223
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225 226 227
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228

229
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
230 231 232
		return errors.New("value not put correctly")
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
233 234
}

Jeromy's avatar
Jeromy committed
235 236
// putProvider sends a message to peer 'p' saying that the local node
// can provide the value of 'key'
237
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238

239
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
240 241

	// add self as the provider
242
	pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self})
243

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
244
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
245 246 247
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
248

249
	log.Debugf("%s putProvider: %s for %s", dht.self, p, u.Key(key))
250
	if rpmes.GetKey() != pmes.GetKey() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
251 252 253 254
		return errors.New("provider not added correctly")
	}

	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
255 256
}

257 258
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
	key u.Key, level int) ([]byte, []peer.Peer, error) {
259 260

	pmes, err := dht.getValueSingle(ctx, p, key, level)
261
	if err != nil {
262
		return nil, nil, err
263 264
	}

265
	if record := pmes.GetRecord(); record != nil {
266
		// Success! We were given the value
Jeromy's avatar
Jeromy committed
267
		log.Debug("getValueOrPeers: got value")
268 269 270 271

		// make sure record is still valid
		err = dht.verifyRecord(record)
		if err != nil {
Jeromy's avatar
Jeromy committed
272
			log.Error("Received invalid record!")
273 274 275
			return nil, nil, err
		}
		return record.GetValue(), nil, nil
276
	}
277

278
	// TODO decide on providers. This probably shouldn't be happening.
279 280 281 282 283
	if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
		val, err := dht.getFromPeerList(ctx, key, prv, level)
		if err != nil {
			return nil, nil, err
		}
Jeromy's avatar
Jeromy committed
284
		log.Debug("getValueOrPeers: get from providers")
285 286
		return val, nil, nil
	}
287 288

	// Perhaps we were given closer peers
289
	var peers []peer.Peer
290
	for _, pb := range pmes.GetCloserPeers() {
291
		pr, err := dht.peerFromInfo(pb)
292
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
293
			log.Error(err)
294 295
			continue
		}
296
		peers = append(peers, pr)
297
	}
298 299

	if len(peers) > 0 {
300
		log.Debug("getValueOrPeers: peers")
301 302 303
		return nil, peers, nil
	}

304 305
	log.Warning("getValueOrPeers: routing.ErrNotFound")
	return nil, nil, routing.ErrNotFound
306 307
}

308
// getValueSingle simply performs the get value RPC with the given parameters
309
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
310
	key u.Key, level int) (*pb.Message, error) {
311

312
	pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level)
313
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
314 315
}

316 317 318 319
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
320
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
321
	peerlist []*pb.Message_Peer, level int) ([]byte, error) {
322

323
	for _, pinfo := range peerlist {
324
		p, err := dht.ensureConnectedToPeer(ctx, pinfo)
325
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
326
			log.Errorf("getFromPeers error: %s", err)
327
			continue
Jeromy's avatar
Jeromy committed
328
		}
329 330

		pmes, err := dht.getValueSingle(ctx, p, key, level)
Jeromy's avatar
Jeromy committed
331
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
332
			log.Errorf("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
333 334 335
			continue
		}

336
		if record := pmes.GetRecord(); record != nil {
337
			// Success! We were given the value
338 339 340 341 342

			err := dht.verifyRecord(record)
			if err != nil {
				return nil, err
			}
343
			dht.providers.AddProvider(key, p)
344
			return record.GetValue(), nil
345
		}
Jeromy's avatar
Jeromy committed
346
	}
347
	return nil, routing.ErrNotFound
Jeromy's avatar
Jeromy committed
348 349
}

350
// getLocal attempts to retrieve the value from the datastore
351
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
352 353
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
Jeromy's avatar
Jeromy committed
354
	log.Debug("getLocal %s", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
355
	v, err := dht.datastore.Get(key.DsKey())
356 357 358
	if err != nil {
		return nil, err
	}
Jeromy's avatar
Jeromy committed
359
	log.Debug("found in db")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
360 361 362

	byt, ok := v.([]byte)
	if !ok {
363
		return nil, errors.New("value stored in datastore not []byte")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
364
	}
365 366 367 368 369 370 371 372 373 374
	rec := new(pb.Record)
	err = proto.Unmarshal(byt, rec)
	if err != nil {
		return nil, err
	}

	// TODO: 'if paranoid'
	if u.Debug {
		err = dht.verifyRecord(rec)
		if err != nil {
Jeromy's avatar
Jeromy committed
375
			log.Errorf("local record verify failed: %s", err)
376 377 378 379 380
			return nil, err
		}
	}

	return rec.GetValue(), nil
381 382
}

383
// putLocal stores the key value pair in the datastore
384
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
385 386 387 388 389 390 391 392 393 394
	rec, err := dht.makePutRecord(key, value)
	if err != nil {
		return err
	}
	data, err := proto.Marshal(rec)
	if err != nil {
		return err
	}

	return dht.datastore.Put(key.DsKey(), data)
395
}
396

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
397 398
// Update signals to all routingTables to Update their last-seen status
// on the given peer.
399 400
func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) {
	log.Event(ctx, "updatePeer", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
401
	removedCount := 0
402
	for _, route := range dht.routingTables {
403
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
404
		// Only close the connection if no tables refer to this peer
405
		if removed != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
406
			removedCount++
407
		}
408
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
409 410 411 412 413 414 415 416

	// Only close the connection if no tables refer to this peer
	// if removedCount == len(dht.routingTables) {
	// 	dht.network.ClosePeer(p)
	// }
	// ACTUALLY, no, let's not just close the connection. it may be connected
	// due to other things. it seems that we just need connection timeouts
	// after some deadline of inactivity.
417
}
Jeromy's avatar
Jeromy committed
418

Jeromy's avatar
Jeromy committed
419
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
420
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
421
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
422 423 424 425 426 427 428
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
429

Jeromy's avatar
Jeromy committed
430
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
431 432
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
433
	return dht.sendRequest(ctx, p, pmes)
434
}
435

436 437
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
438
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
439 440
}

441
func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer {
442
	var provArr []peer.Peer
Jeromy's avatar
Jeromy committed
443
	for _, prov := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
444 445
		p, err := dht.peerFromInfo(prov)
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
446
			log.Errorf("error getting peer from info: %v", err)
Jeromy's avatar
Jeromy committed
447 448
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
449

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
450
		log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
451

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
452
		// Dont add outselves to the list
453
		if p.ID().Equal(dht.self.ID()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
454
			continue
Jeromy's avatar
Jeromy committed
455
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
456 457

		// TODO(jbenet) ensure providers is idempotent
458
		dht.providers.AddProvider(key, p)
459
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
460
	}
461
	return provArr
Jeromy's avatar
Jeromy committed
462
}
Jeromy's avatar
Jeromy committed
463

464
// nearestPeersToQuery returns the routing tables closest peers.
465
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
466 467 468 469
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
470
	closer := cluster.NearestPeers(kb.ConvertKey(key), count)
471 472 473
	return closer
}

474
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
475
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
476
	closer := dht.nearestPeersToQuery(pmes, count)
477 478 479 480 481 482

	// no node? nil
	if closer == nil {
		return nil
	}

483 484
	// == to self? thats bad
	for _, p := range closer {
485
		if p.ID().Equal(dht.self.ID()) {
486 487 488
			log.Error("Attempted to return self! this shouldnt happen...")
			return nil
		}
489 490
	}

491
	var filtered []peer.Peer
492 493 494
	for _, p := range closer {
		// must all be closer than self
		key := u.Key(pmes.GetKey())
495
		if !kb.Closer(dht.self.ID(), p.ID(), key) {
496 497
			filtered = append(filtered, p)
		}
498 499
	}

500 501
	// ok seems like closer nodes
	return filtered
502 503
}

Jeromy's avatar
Jeromy committed
504
// getPeer searches the peerstore for a peer with the given peer ID
505
func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
506 507 508
	p, err := dht.peerstore.Get(id)
	if err != nil {
		err = fmt.Errorf("Failed to get peer from peerstore: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
509
		log.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
510 511 512 513 514
		return nil, err
	}
	return p, nil
}

Jeromy's avatar
Jeromy committed
515 516
// peerFromInfo returns a peer using info in the protobuf peer struct
// to lookup or create a peer
517
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {
518 519

	id := peer.ID(pbp.GetId())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
520

521 522
	// bail out if it's ourselves
	//TODO(jbenet) not sure this should be an error _here_
523
	if id.Equal(dht.self.ID()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
524 525 526
		return nil, errors.New("found self")
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
527 528 529
	p, err := dht.getPeer(id)
	if err != nil {
		return nil, err
530 531
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
532 533 534
	maddr, err := pbp.Address()
	if err != nil {
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
535
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
536
	p.AddAddress(maddr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
537 538 539
	return p, nil
}

540
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
541 542 543
	p, err := dht.peerFromInfo(pbp)
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
544 545
	}

546
	// dial connection
547
	err = dht.dialer.DialPeer(ctx, p)
548
	return p, err
Jeromy's avatar
Jeromy committed
549
}
550

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
551
//TODO: this should be smarter about which keys it selects.
552
func (dht *IpfsDHT) loadProvidableKeys() error {
553 554 555 556
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
557 558 559
	for _, dsk := range kl {
		k := u.KeyFromDsKey(dsk)
		if len(k) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
560
			log.Errorf("loadProvidableKeys error: %v", dsk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
561 562 563
		}

		dht.providers.AddProvider(k, dht.self)
564 565 566 567
	}
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
568
// PingRoutine periodically pings nearest neighbors.
569
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
570 571
	defer dht.Children().Done()

572 573 574 575 576 577 578 579
	tick := time.Tick(t)
	for {
		select {
		case <-tick:
			id := make([]byte, 16)
			rand.Read(id)
			peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
			for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
580
				ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
581 582
				err := dht.Ping(ctx, p)
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
583
					log.Errorf("Ping error: %s", err)
584 585
				}
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
586
		case <-dht.Closing():
587 588 589 590 591
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
592
// Bootstrap builds up list of peers by requesting random peer IDs
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
593
func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
594 595
	id := make([]byte, 16)
	rand.Read(id)
Jeromy's avatar
Jeromy committed
596 597 598 599
	p, err := dht.FindPeer(ctx, peer.ID(id))
	if err != nil {
		log.Error("Bootstrap peer error: %s", err)
	}
600
	err = dht.dialer.DialPeer(ctx, p)
601
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
602
		log.Errorf("Bootstrap peer error: %s", err)
603
	}
604
}