dht.go 14.7 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
// Package dht implements a distributed hash table that satisfies the ipfs routing
2
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4
package dht

5
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"bytes"
7
	"crypto/rand"
8
	"errors"
9
	"fmt"
10 11
	"sync"
	"time"
12

13
	inet "github.com/jbenet/go-ipfs/net"
14
	msg "github.com/jbenet/go-ipfs/net/message"
15
	peer "github.com/jbenet/go-ipfs/peer"
16
	routing "github.com/jbenet/go-ipfs/routing"
17
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
Jeromy's avatar
Jeromy committed
18
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
19
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
21
	"github.com/jbenet/go-ipfs/util/eventlog"
22

23
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
24
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
Jeromy's avatar
Jeromy committed
25

26
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
27 28
)

29
var log = eventlog.Logger("dht")
30

31
const doPinging = false
32

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33 34 35 36 37
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
38 39
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
40
	routingTables []*kb.RoutingTable
41

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42 43 44
	// the network services we need
	dialer inet.Dialer
	sender inet.Sender
45

Jeromy's avatar
Jeromy committed
46
	// Local peer (yourself)
47
	self peer.Peer
Jeromy's avatar
Jeromy committed
48

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51
	// Other peers
	peerstore peer.Peerstore

Jeromy's avatar
Jeromy committed
52 53
	// Local data
	datastore ds.Datastore
54
	dslock    sync.Mutex
55

56
	providers *ProviderManager
57

58 59
	// When this peer started up
	birth time.Time
60 61 62

	//lock to make diagnostics work better
	diaglock sync.Mutex
63

64 65 66
	// record validator funcs
	Validators map[string]ValidatorFunc

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
	ctxc.ContextCloser
68 69
}

Jeromy's avatar
Jeromy committed
70
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dialer, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
72
	dht := new(IpfsDHT)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
	dht.dialer = dialer
74
	dht.sender = sender
Jeromy's avatar
Jeromy committed
75
	dht.datastore = dstore
76
	dht.self = p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
	dht.peerstore = ps
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78
	dht.ContextCloser = ctxc.NewContextCloser(ctx, nil)
79

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81
	dht.providers = NewProviderManager(dht.Context(), p.ID())
	dht.AddCloserChild(dht.providers)
82

83
	dht.routingTables = make([]*kb.RoutingTable, 3)
84 85 86
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
87
	dht.birth = time.Now()
88

89
	dht.Validators = make(map[string]ValidatorFunc)
90
	dht.Validators["pk"] = ValidatePublicKeyRecord
91

92
	if doPinging {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93
		dht.Children().Add(1)
94 95
		go dht.PingRoutine(time.Second * 10)
	}
Jeromy's avatar
Jeromy committed
96
	return dht
97 98
}

99
// Connect to a new peer at the given address, ping and add to the routing table
100
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) error {
101
	err := dht.dialer.DialPeer(ctx, npeer)
102
	if err != nil {
103
		return err
104 105
	}

Jeromy's avatar
Jeromy committed
106 107
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108
	err = dht.Ping(ctx, npeer)
Jeromy's avatar
Jeromy committed
109
	if err != nil {
110
		return fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
111
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
112
	log.Event(ctx, "connect", dht.self, npeer)
Jeromy's avatar
Jeromy committed
113

114
	dht.Update(ctx, npeer)
115

116
	return nil
Jeromy's avatar
Jeromy committed
117 118
}

119
// HandleMessage implements the inet.Handler interface.
120
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage {
121 122 123

	mData := mes.Data()
	if mData == nil {
124
		log.Error("Message contained nil data.")
125
		return nil
126 127 128 129
	}

	mPeer := mes.Peer()
	if mPeer == nil {
130
		log.Error("Message contained nil peer.")
131
		return nil
132 133 134
	}

	// deserialize msg
135
	pmes := new(pb.Message)
136 137
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
138
		log.Error("Error unmarshaling data")
139
		return nil
140 141 142
	}

	// update the peer (on valid msgs only)
143
	dht.Update(ctx, mPeer)
144

145
	log.Event(ctx, "foo", dht.self, mPeer, pmes)
146 147 148 149

	// get handler for this msg type.
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
150
		log.Error("got back nil handler from handlerForMsgType")
151
		return nil
152 153 154
	}

	// dispatch handler.
155
	rpmes, err := handler(ctx, mPeer, pmes)
156
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
		log.Errorf("handle message error: %s", err)
158
		return nil
159 160
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161 162
	// if nil response, return it before serializing
	if rpmes == nil {
163
		log.Warning("Got back nil response from request.")
164
		return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165 166
	}

167 168 169
	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170
		log.Errorf("serialze response error: %s", err)
171
		return nil
172 173
	}

174
	return rmes
175 176
}

177 178
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
179
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
180 181 182 183 184 185 186 187

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

188
	rmes, err := dht.sender.SendRequest(ctx, mes) // respect?
189 190 191
	if err != nil {
		return nil, err
	}
192 193 194
	if rmes == nil {
		return nil, errors.New("no response to request")
	}
195
	log.Event(ctx, "sentMessage", dht.self, p, pmes)
196

197
	rmes.Peer().SetLatency(time.Since(start))
198

199
	rpmes := new(pb.Message)
200 201 202 203 204 205
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}
	return rpmes, nil
}

206
// putValueToNetwork stores the given key/value pair at the peer 'p'
207
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
208
	key string, rec *pb.Record) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209

210
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
211
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213 214 215
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216

217
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218 219 220
		return errors.New("value not put correctly")
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221 222
}

Jeromy's avatar
Jeromy committed
223 224
// putProvider sends a message to peer 'p' saying that the local node
// can provide the value of 'key'
225
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226

227
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
228 229

	// add self as the provider
230
	pmes.ProviderPeers = pb.PeersToPBPeers(dht.dialer, []peer.Peer{dht.self})
231

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
233 234 235
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236

237
	log.Debugf("%s putProvider: %s for %s", dht.self, p, u.Key(key))
238
	if rpmes.GetKey() != pmes.GetKey() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
239 240 241 242
		return errors.New("provider not added correctly")
	}

	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
243 244
}

245 246
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
	key u.Key, level int) ([]byte, []peer.Peer, error) {
247 248

	pmes, err := dht.getValueSingle(ctx, p, key, level)
249
	if err != nil {
250
		return nil, nil, err
251 252
	}

253
	if record := pmes.GetRecord(); record != nil {
254
		// Success! We were given the value
Jeromy's avatar
Jeromy committed
255
		log.Debug("getValueOrPeers: got value")
256 257 258 259

		// make sure record is still valid
		err = dht.verifyRecord(record)
		if err != nil {
Jeromy's avatar
Jeromy committed
260
			log.Error("Received invalid record!")
261 262 263
			return nil, nil, err
		}
		return record.GetValue(), nil, nil
264
	}
265

266
	// TODO decide on providers. This probably shouldn't be happening.
267 268 269 270 271
	if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
		val, err := dht.getFromPeerList(ctx, key, prv, level)
		if err != nil {
			return nil, nil, err
		}
Jeromy's avatar
Jeromy committed
272
		log.Debug("getValueOrPeers: get from providers")
273 274
		return val, nil, nil
	}
275 276

	// Perhaps we were given closer peers
277 278
	peers, errs := pb.PBPeersToPeers(dht.peerstore, pmes.GetCloserPeers())
	for _, err := range errs {
279
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
280
			log.Error(err)
281
		}
282
	}
283 284

	if len(peers) > 0 {
285
		log.Debug("getValueOrPeers: peers")
286 287 288
		return nil, peers, nil
	}

289 290
	log.Warning("getValueOrPeers: routing.ErrNotFound")
	return nil, nil, routing.ErrNotFound
291 292
}

293
// getValueSingle simply performs the get value RPC with the given parameters
294
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
295
	key u.Key, level int) (*pb.Message, error) {
296

297
	pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level)
298
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
299 300
}

301 302 303 304
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
305
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
306
	peerlist []*pb.Message_Peer, level int) ([]byte, error) {
307

308
	for _, pinfo := range peerlist {
309
		p, err := dht.ensureConnectedToPeer(ctx, pinfo)
310
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
311
			log.Errorf("getFromPeers error: %s", err)
312
			continue
Jeromy's avatar
Jeromy committed
313
		}
314 315

		pmes, err := dht.getValueSingle(ctx, p, key, level)
Jeromy's avatar
Jeromy committed
316
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
317
			log.Errorf("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
318 319 320
			continue
		}

321
		if record := pmes.GetRecord(); record != nil {
322
			// Success! We were given the value
323 324 325 326 327

			err := dht.verifyRecord(record)
			if err != nil {
				return nil, err
			}
328
			dht.providers.AddProvider(key, p)
329
			return record.GetValue(), nil
330
		}
Jeromy's avatar
Jeromy committed
331
	}
332
	return nil, routing.ErrNotFound
Jeromy's avatar
Jeromy committed
333 334
}

335
// getLocal attempts to retrieve the value from the datastore
336
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
337 338
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
Jeromy's avatar
Jeromy committed
339
	log.Debug("getLocal %s", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
340
	v, err := dht.datastore.Get(key.DsKey())
341 342 343
	if err != nil {
		return nil, err
	}
Jeromy's avatar
Jeromy committed
344
	log.Debug("found in db")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
345 346 347

	byt, ok := v.([]byte)
	if !ok {
348
		return nil, errors.New("value stored in datastore not []byte")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
349
	}
350 351 352 353 354 355 356 357 358 359
	rec := new(pb.Record)
	err = proto.Unmarshal(byt, rec)
	if err != nil {
		return nil, err
	}

	// TODO: 'if paranoid'
	if u.Debug {
		err = dht.verifyRecord(rec)
		if err != nil {
Jeromy's avatar
Jeromy committed
360
			log.Errorf("local record verify failed: %s", err)
361 362 363 364 365
			return nil, err
		}
	}

	return rec.GetValue(), nil
366 367
}

368
// putLocal stores the key value pair in the datastore
369
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
370 371 372 373 374 375 376 377 378 379
	rec, err := dht.makePutRecord(key, value)
	if err != nil {
		return err
	}
	data, err := proto.Marshal(rec)
	if err != nil {
		return err
	}

	return dht.datastore.Put(key.DsKey(), data)
380
}
381

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
382 383
// Update signals to all routingTables to Update their last-seen status
// on the given peer.
384 385
func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) {
	log.Event(ctx, "updatePeer", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
386
	removedCount := 0
387
	for _, route := range dht.routingTables {
388
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
389
		// Only close the connection if no tables refer to this peer
390
		if removed != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
391
			removedCount++
392
		}
393
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
394 395 396 397 398 399 400 401

	// Only close the connection if no tables refer to this peer
	// if removedCount == len(dht.routingTables) {
	// 	dht.network.ClosePeer(p)
	// }
	// ACTUALLY, no, let's not just close the connection. it may be connected
	// due to other things. it seems that we just need connection timeouts
	// after some deadline of inactivity.
402
}
Jeromy's avatar
Jeromy committed
403

Jeromy's avatar
Jeromy committed
404
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
405
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
406
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
407 408 409 410 411 412 413
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
414

Jeromy's avatar
Jeromy committed
415
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
416 417
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
418
	return dht.sendRequest(ctx, p, pmes)
419
}
420

421 422
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
423
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
424 425
}

426 427 428 429 430
func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.Peer {
	peers, errs := pb.PBPeersToPeers(dht.peerstore, pbps)
	for _, err := range errs {
		log.Errorf("error converting peer: %v", err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
431

432 433
	var provArr []peer.Peer
	for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
434
		// Dont add outselves to the list
435
		if p.ID().Equal(dht.self.ID()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
436
			continue
Jeromy's avatar
Jeromy committed
437
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
438

439
		log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
440
		// TODO(jbenet) ensure providers is idempotent
441
		dht.providers.AddProvider(key, p)
442
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
443
	}
444
	return provArr
Jeromy's avatar
Jeromy committed
445
}
Jeromy's avatar
Jeromy committed
446

447
// nearestPeersToQuery returns the routing tables closest peers.
448
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
449 450 451 452
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
453
	closer := cluster.NearestPeers(kb.ConvertKey(key), count)
454 455 456
	return closer
}

457
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
458
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
459
	closer := dht.nearestPeersToQuery(pmes, count)
460 461 462 463 464 465

	// no node? nil
	if closer == nil {
		return nil
	}

466 467
	// == to self? thats bad
	for _, p := range closer {
468
		if p.ID().Equal(dht.self.ID()) {
469 470 471
			log.Error("Attempted to return self! this shouldnt happen...")
			return nil
		}
472 473
	}

474
	var filtered []peer.Peer
475 476 477
	for _, p := range closer {
		// must all be closer than self
		key := u.Key(pmes.GetKey())
478
		if !kb.Closer(dht.self.ID(), p.ID(), key) {
479 480
			filtered = append(filtered, p)
		}
481 482
	}

483 484
	// ok seems like closer nodes
	return filtered
485 486
}

Jeromy's avatar
Jeromy committed
487
// getPeer searches the peerstore for a peer with the given peer ID
488
func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
489
	p, err := dht.peerstore.FindOrCreate(id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
490 491
	if err != nil {
		err = fmt.Errorf("Failed to get peer from peerstore: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
492
		log.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
493 494 495 496 497
		return nil, err
	}
	return p, nil
}

498 499
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) {
	p, err := pb.PBPeerToPeer(dht.peerstore, pbp)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
500 501
	if err != nil {
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
502 503
	}

504 505
	if dht.dialer.LocalPeer().ID().Equal(p.ID()) {
		return nil, errors.New("attempting to ensure connection to self")
Jeromy's avatar
Jeromy committed
506 507
	}

508
	// dial connection
509
	err = dht.dialer.DialPeer(ctx, p)
510
	return p, err
Jeromy's avatar
Jeromy committed
511
}
512

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
513
//TODO: this should be smarter about which keys it selects.
514
func (dht *IpfsDHT) loadProvidableKeys() error {
515 516 517 518
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
519 520 521
	for _, dsk := range kl {
		k := u.KeyFromDsKey(dsk)
		if len(k) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
522
			log.Errorf("loadProvidableKeys error: %v", dsk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
523 524 525
		}

		dht.providers.AddProvider(k, dht.self)
526 527 528 529
	}
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
530
// PingRoutine periodically pings nearest neighbors.
531
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
532 533
	defer dht.Children().Done()

534 535 536 537 538 539 540 541
	tick := time.Tick(t)
	for {
		select {
		case <-tick:
			id := make([]byte, 16)
			rand.Read(id)
			peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
			for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
542
				ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
543 544
				err := dht.Ping(ctx, p)
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
545
					log.Errorf("Ping error: %s", err)
546 547
				}
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
548
		case <-dht.Closing():
549 550 551 552 553
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
554
// Bootstrap builds up list of peers by requesting random peer IDs
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
555
func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
556 557
	id := make([]byte, 16)
	rand.Read(id)
Jeromy's avatar
Jeromy committed
558 559
	p, err := dht.FindPeer(ctx, peer.ID(id))
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
560
		log.Errorf("Bootstrap peer error: %s", err)
Jeromy's avatar
Jeromy committed
561
	}
562
	err = dht.dialer.DialPeer(ctx, p)
563
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
564
		log.Errorf("Bootstrap peer error: %s", err)
565
	}
566
}