dht.go 15.2 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
// Package dht implements a distributed hash table that satisfies the ipfs routing
2
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4
package dht

5
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"bytes"
7
	"crypto/rand"
8
	"errors"
9
	"fmt"
10 11
	"sync"
	"time"
12

13
	inet "github.com/jbenet/go-ipfs/net"
14
	msg "github.com/jbenet/go-ipfs/net/message"
15
	peer "github.com/jbenet/go-ipfs/peer"
16
	routing "github.com/jbenet/go-ipfs/routing"
17
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
Jeromy's avatar
Jeromy committed
18
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
19
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
21
	"github.com/jbenet/go-ipfs/util/eventlog"
22

23
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
24
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
Jeromy's avatar
Jeromy committed
25

26
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
27 28
)

29
var log = eventlog.Logger("dht")
30

31
const doPinging = false
32

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33 34 35 36 37
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
38 39
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
40
	routingTables []*kb.RoutingTable
41

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42 43 44
	// the network services we need
	dialer inet.Dialer
	sender inet.Sender
45

Jeromy's avatar
Jeromy committed
46
	// Local peer (yourself)
47
	self peer.Peer
Jeromy's avatar
Jeromy committed
48

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51
	// Other peers
	peerstore peer.Peerstore

Jeromy's avatar
Jeromy committed
52 53
	// Local data
	datastore ds.Datastore
54
	dslock    sync.Mutex
55

56
	providers *ProviderManager
57

58 59
	// When this peer started up
	birth time.Time
60 61 62

	//lock to make diagnostics work better
	diaglock sync.Mutex
63

64 65 66
	// record validator funcs
	Validators map[string]ValidatorFunc

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
	ctxc.ContextCloser
68 69
}

Jeromy's avatar
Jeromy committed
70
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dialer, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
72
	dht := new(IpfsDHT)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
	dht.dialer = dialer
74
	dht.sender = sender
Jeromy's avatar
Jeromy committed
75
	dht.datastore = dstore
76
	dht.self = p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
	dht.peerstore = ps
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78
	dht.ContextCloser = ctxc.NewContextCloser(ctx, nil)
79

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81
	dht.providers = NewProviderManager(dht.Context(), p.ID())
	dht.AddCloserChild(dht.providers)
82

83
	dht.routingTables = make([]*kb.RoutingTable, 3)
84 85 86
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
87
	dht.birth = time.Now()
88

89
	dht.Validators = make(map[string]ValidatorFunc)
90
	dht.Validators["pk"] = ValidatePublicKeyRecord
91

92
	if doPinging {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93
		dht.Children().Add(1)
94 95
		go dht.PingRoutine(time.Second * 10)
	}
Jeromy's avatar
Jeromy committed
96
	return dht
97 98
}

99
// Connect to a new peer at the given address, ping and add to the routing table
100
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) error {
101
	err := dht.dialer.DialPeer(ctx, npeer)
102
	if err != nil {
103
		return err
104 105
	}

Jeromy's avatar
Jeromy committed
106 107
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108
	err = dht.Ping(ctx, npeer)
Jeromy's avatar
Jeromy committed
109
	if err != nil {
110
		return fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
111
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
112
	log.Event(ctx, "connect", dht.self, npeer)
Jeromy's avatar
Jeromy committed
113

114
	dht.Update(ctx, npeer)
115

116
	return nil
Jeromy's avatar
Jeromy committed
117 118
}

119
// HandleMessage implements the inet.Handler interface.
120
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage {
121 122 123

	mData := mes.Data()
	if mData == nil {
124
		log.Error("Message contained nil data.")
125
		return nil
126 127 128 129
	}

	mPeer := mes.Peer()
	if mPeer == nil {
130
		log.Error("Message contained nil peer.")
131
		return nil
132 133 134
	}

	// deserialize msg
135
	pmes := new(pb.Message)
136 137
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
138
		log.Error("Error unmarshaling data")
139
		return nil
140 141 142
	}

	// update the peer (on valid msgs only)
143
	dht.Update(ctx, mPeer)
144

145
	log.Event(ctx, "foo", dht.self, mPeer, pmes)
146 147 148 149

	// get handler for this msg type.
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
150
		log.Error("got back nil handler from handlerForMsgType")
151
		return nil
152 153 154
	}

	// dispatch handler.
155
	rpmes, err := handler(ctx, mPeer, pmes)
156
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
		log.Errorf("handle message error: %s", err)
158
		return nil
159 160
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161 162
	// if nil response, return it before serializing
	if rpmes == nil {
163
		log.Warning("Got back nil response from request.")
164
		return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165 166
	}

167 168 169
	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170
		log.Errorf("serialze response error: %s", err)
171
		return nil
172 173
	}

174
	return rmes
175 176
}

177 178
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
179
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
180 181 182 183 184 185 186 187

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

188
	rmes, err := dht.sender.SendRequest(ctx, mes) // respect?
189 190 191
	if err != nil {
		return nil, err
	}
192 193 194
	if rmes == nil {
		return nil, errors.New("no response to request")
	}
195
	log.Event(ctx, "sentMessage", dht.self, p, pmes)
196

197
	rmes.Peer().SetLatency(time.Since(start))
198

199
	rpmes := new(pb.Message)
200 201 202 203 204 205
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}
	return rpmes, nil
}

206
// putValueToNetwork stores the given key/value pair at the peer 'p'
207
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
208
	key string, rec *pb.Record) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209

210
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
211
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213 214 215
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216

217
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218 219 220
		return errors.New("value not put correctly")
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221 222
}

Jeromy's avatar
Jeromy committed
223 224
// putProvider sends a message to peer 'p' saying that the local node
// can provide the value of 'key'
225
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226

227
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
228 229

	// add self as the provider
230
	pmes.ProviderPeers = pb.PeersToPBPeers(dht.dialer, []peer.Peer{dht.self})
231

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
233 234 235
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236

237
	log.Debugf("%s putProvider: %s for %s", dht.self, p, u.Key(key))
238
	if rpmes.GetKey() != pmes.GetKey() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
239 240 241 242
		return errors.New("provider not added correctly")
	}

	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
243 244
}

245 246
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
	key u.Key, level int) ([]byte, []peer.Peer, error) {
247 248

	pmes, err := dht.getValueSingle(ctx, p, key, level)
249
	if err != nil {
250
		return nil, nil, err
251 252
	}

253
	if record := pmes.GetRecord(); record != nil {
254
		// Success! We were given the value
Jeromy's avatar
Jeromy committed
255
		log.Debug("getValueOrPeers: got value")
256 257 258 259

		// make sure record is still valid
		err = dht.verifyRecord(record)
		if err != nil {
Jeromy's avatar
Jeromy committed
260
			log.Error("Received invalid record!")
261 262 263
			return nil, nil, err
		}
		return record.GetValue(), nil, nil
264
	}
265

266
	// TODO decide on providers. This probably shouldn't be happening.
267 268 269 270 271
	if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
		val, err := dht.getFromPeerList(ctx, key, prv, level)
		if err != nil {
			return nil, nil, err
		}
Jeromy's avatar
Jeromy committed
272
		log.Debug("getValueOrPeers: get from providers")
273 274
		return val, nil, nil
	}
275 276

	// Perhaps we were given closer peers
277
	var peers []peer.Peer
278
	for _, pb := range pmes.GetCloserPeers() {
279
		pr, err := dht.peerFromInfo(pb)
280
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
281
			log.Error(err)
282 283
			continue
		}
284
		peers = append(peers, pr)
285
	}
286 287

	if len(peers) > 0 {
288
		log.Debug("getValueOrPeers: peers")
289 290 291
		return nil, peers, nil
	}

292 293
	log.Warning("getValueOrPeers: routing.ErrNotFound")
	return nil, nil, routing.ErrNotFound
294 295
}

296
// getValueSingle simply performs the get value RPC with the given parameters
297
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
298
	key u.Key, level int) (*pb.Message, error) {
299

300
	pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level)
301
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
302 303
}

304 305 306 307
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
308
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
309
	peerlist []*pb.Message_Peer, level int) ([]byte, error) {
310

311
	for _, pinfo := range peerlist {
312
		p, err := dht.ensureConnectedToPeer(ctx, pinfo)
313
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
314
			log.Errorf("getFromPeers error: %s", err)
315
			continue
Jeromy's avatar
Jeromy committed
316
		}
317 318

		pmes, err := dht.getValueSingle(ctx, p, key, level)
Jeromy's avatar
Jeromy committed
319
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
320
			log.Errorf("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
321 322 323
			continue
		}

324
		if record := pmes.GetRecord(); record != nil {
325
			// Success! We were given the value
326 327 328 329 330

			err := dht.verifyRecord(record)
			if err != nil {
				return nil, err
			}
331
			dht.providers.AddProvider(key, p)
332
			return record.GetValue(), nil
333
		}
Jeromy's avatar
Jeromy committed
334
	}
335
	return nil, routing.ErrNotFound
Jeromy's avatar
Jeromy committed
336 337
}

338
// getLocal attempts to retrieve the value from the datastore
339
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
340 341
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
Jeromy's avatar
Jeromy committed
342
	log.Debug("getLocal %s", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
343
	v, err := dht.datastore.Get(key.DsKey())
344 345 346
	if err != nil {
		return nil, err
	}
Jeromy's avatar
Jeromy committed
347
	log.Debug("found in db")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
348 349 350

	byt, ok := v.([]byte)
	if !ok {
351
		return nil, errors.New("value stored in datastore not []byte")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
352
	}
353 354 355 356 357 358 359 360 361 362
	rec := new(pb.Record)
	err = proto.Unmarshal(byt, rec)
	if err != nil {
		return nil, err
	}

	// TODO: 'if paranoid'
	if u.Debug {
		err = dht.verifyRecord(rec)
		if err != nil {
Jeromy's avatar
Jeromy committed
363
			log.Errorf("local record verify failed: %s", err)
364 365 366 367 368
			return nil, err
		}
	}

	return rec.GetValue(), nil
369 370
}

371
// putLocal stores the key value pair in the datastore
372
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
373 374 375 376 377 378 379 380 381 382
	rec, err := dht.makePutRecord(key, value)
	if err != nil {
		return err
	}
	data, err := proto.Marshal(rec)
	if err != nil {
		return err
	}

	return dht.datastore.Put(key.DsKey(), data)
383
}
384

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
385 386
// Update signals to all routingTables to Update their last-seen status
// on the given peer.
387 388
func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) {
	log.Event(ctx, "updatePeer", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
389
	removedCount := 0
390
	for _, route := range dht.routingTables {
391
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
392
		// Only close the connection if no tables refer to this peer
393
		if removed != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
394
			removedCount++
395
		}
396
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
397 398 399 400 401 402 403 404

	// Only close the connection if no tables refer to this peer
	// if removedCount == len(dht.routingTables) {
	// 	dht.network.ClosePeer(p)
	// }
	// ACTUALLY, no, let's not just close the connection. it may be connected
	// due to other things. it seems that we just need connection timeouts
	// after some deadline of inactivity.
405
}
Jeromy's avatar
Jeromy committed
406

Jeromy's avatar
Jeromy committed
407
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
408
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
409
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
410 411 412 413 414 415 416
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
417

Jeromy's avatar
Jeromy committed
418
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
419 420
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
421
	return dht.sendRequest(ctx, p, pmes)
422
}
423

424 425
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
426
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
427 428
}

429
func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer {
430
	var provArr []peer.Peer
Jeromy's avatar
Jeromy committed
431
	for _, prov := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
432 433
		p, err := dht.peerFromInfo(prov)
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
434
			log.Errorf("error getting peer from info: %v", err)
Jeromy's avatar
Jeromy committed
435 436
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
438
		log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
439

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
440
		// Dont add outselves to the list
441
		if p.ID().Equal(dht.self.ID()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
442
			continue
Jeromy's avatar
Jeromy committed
443
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
444 445

		// TODO(jbenet) ensure providers is idempotent
446
		dht.providers.AddProvider(key, p)
447
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
448
	}
449
	return provArr
Jeromy's avatar
Jeromy committed
450
}
Jeromy's avatar
Jeromy committed
451

452
// nearestPeersToQuery returns the routing tables closest peers.
453
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
454 455 456 457
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
458
	closer := cluster.NearestPeers(kb.ConvertKey(key), count)
459 460 461
	return closer
}

462
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
463
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
464
	closer := dht.nearestPeersToQuery(pmes, count)
465 466 467 468 469 470

	// no node? nil
	if closer == nil {
		return nil
	}

471 472
	// == to self? thats bad
	for _, p := range closer {
473
		if p.ID().Equal(dht.self.ID()) {
474 475 476
			log.Error("Attempted to return self! this shouldnt happen...")
			return nil
		}
477 478
	}

479
	var filtered []peer.Peer
480 481 482
	for _, p := range closer {
		// must all be closer than self
		key := u.Key(pmes.GetKey())
483
		if !kb.Closer(dht.self.ID(), p.ID(), key) {
484 485
			filtered = append(filtered, p)
		}
486 487
	}

488 489
	// ok seems like closer nodes
	return filtered
490 491
}

Jeromy's avatar
Jeromy committed
492
// getPeer searches the peerstore for a peer with the given peer ID
493
func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
494
	p, err := dht.peerstore.FindOrCreate(id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
495 496
	if err != nil {
		err = fmt.Errorf("Failed to get peer from peerstore: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
497
		log.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
498 499 500 501 502
		return nil, err
	}
	return p, nil
}

Jeromy's avatar
Jeromy committed
503 504
// peerFromInfo returns a peer using info in the protobuf peer struct
// to lookup or create a peer
505
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {
506 507

	id := peer.ID(pbp.GetId())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
508

509 510
	// bail out if it's ourselves
	//TODO(jbenet) not sure this should be an error _here_
511
	if id.Equal(dht.self.ID()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
512 513 514
		return nil, errors.New("found self")
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
515 516 517
	p, err := dht.getPeer(id)
	if err != nil {
		return nil, err
518 519
	}

520 521
	// add addresses we've just discovered
	maddrs, err := pbp.Addresses()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
522 523
	if err != nil {
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
524
	}
525 526 527
	for _, maddr := range maddrs {
		p.AddAddress(maddr)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
528 529 530
	return p, nil
}

531
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
532 533 534
	p, err := dht.peerFromInfo(pbp)
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
535 536
	}

537
	// dial connection
538
	err = dht.dialer.DialPeer(ctx, p)
539
	return p, err
Jeromy's avatar
Jeromy committed
540
}
541

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
542
//TODO: this should be smarter about which keys it selects.
543
func (dht *IpfsDHT) loadProvidableKeys() error {
544 545 546 547
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
548 549 550
	for _, dsk := range kl {
		k := u.KeyFromDsKey(dsk)
		if len(k) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
551
			log.Errorf("loadProvidableKeys error: %v", dsk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
552 553 554
		}

		dht.providers.AddProvider(k, dht.self)
555 556 557 558
	}
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
559
// PingRoutine periodically pings nearest neighbors.
560
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
561 562
	defer dht.Children().Done()

563 564 565 566 567 568 569 570
	tick := time.Tick(t)
	for {
		select {
		case <-tick:
			id := make([]byte, 16)
			rand.Read(id)
			peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
			for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
571
				ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
572 573
				err := dht.Ping(ctx, p)
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
574
					log.Errorf("Ping error: %s", err)
575 576
				}
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
577
		case <-dht.Closing():
578 579 580 581 582
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
583
// Bootstrap builds up list of peers by requesting random peer IDs
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
584
func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
585 586
	id := make([]byte, 16)
	rand.Read(id)
Jeromy's avatar
Jeromy committed
587 588
	p, err := dht.FindPeer(ctx, peer.ID(id))
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
589
		log.Errorf("Bootstrap peer error: %s", err)
Jeromy's avatar
Jeromy committed
590
	}
591
	err = dht.dialer.DialPeer(ctx, p)
592
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
593
		log.Errorf("Bootstrap peer error: %s", err)
594
	}
595
}