dht.go 15.5 KB
Newer Older
1 2
// package dht implements a distributed hash table that satisfies the ipfs routing
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4
package dht

5
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"bytes"
7
	"crypto/rand"
8
	"errors"
9
	"fmt"
10 11
	"sync"
	"time"
12

13
	inet "github.com/jbenet/go-ipfs/net"
14
	msg "github.com/jbenet/go-ipfs/net/message"
15
	peer "github.com/jbenet/go-ipfs/peer"
16
	routing "github.com/jbenet/go-ipfs/routing"
17
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
Jeromy's avatar
Jeromy committed
18
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
19
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
21
	"github.com/jbenet/go-ipfs/util/eventlog"
22

23
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
24
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
Jeromy's avatar
Jeromy committed
25

26
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
27 28
)

29
var log = eventlog.Logger("dht")
30

31
const doPinging = false
32

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33 34 35 36 37
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
38 39
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
40
	routingTables []*kb.RoutingTable
41

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42 43 44
	// the network services we need
	dialer inet.Dialer
	sender inet.Sender
45

Jeromy's avatar
Jeromy committed
46
	// Local peer (yourself)
47
	self peer.Peer
Jeromy's avatar
Jeromy committed
48

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51
	// Other peers
	peerstore peer.Peerstore

Jeromy's avatar
Jeromy committed
52 53
	// Local data
	datastore ds.Datastore
54
	dslock    sync.Mutex
55

56
	providers *ProviderManager
57

58 59
	// When this peer started up
	birth time.Time
60 61 62

	//lock to make diagnostics work better
	diaglock sync.Mutex
63

64 65 66
	// record validator funcs
	Validators map[string]ValidatorFunc

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
	ctxc.ContextCloser
68 69
}

Jeromy's avatar
Jeromy committed
70
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dialer, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
72
	dht := new(IpfsDHT)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
	dht.dialer = dialer
74
	dht.sender = sender
Jeromy's avatar
Jeromy committed
75
	dht.datastore = dstore
76
	dht.self = p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
	dht.peerstore = ps
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78
	dht.ContextCloser = ctxc.NewContextCloser(ctx, nil)
79

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81
	dht.providers = NewProviderManager(dht.Context(), p.ID())
	dht.AddCloserChild(dht.providers)
82

83
	dht.routingTables = make([]*kb.RoutingTable, 3)
84 85 86
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
87
	dht.birth = time.Now()
88

89
	dht.Validators = make(map[string]ValidatorFunc)
90
	dht.Validators["pk"] = ValidatePublicKeyRecord
91

92
	if doPinging {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93
		dht.Children().Add(1)
94 95
		go dht.PingRoutine(time.Second * 10)
	}
Jeromy's avatar
Jeromy committed
96
	return dht
97 98
}

99
// Connect to a new peer at the given address, ping and add to the routing table
100
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, error) {
101 102 103 104 105 106 107 108 109
	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
110
	err := dht.dialer.DialPeer(ctx, npeer)
111
	if err != nil {
112
		return nil, err
113 114
	}

Jeromy's avatar
Jeromy committed
115 116
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
	err = dht.Ping(ctx, npeer)
Jeromy's avatar
Jeromy committed
118
	if err != nil {
119
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
120
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
121
	log.Event(ctx, "connect", dht.self, npeer)
Jeromy's avatar
Jeromy committed
122

123
	dht.Update(ctx, npeer)
124

125
	return npeer, nil
Jeromy's avatar
Jeromy committed
126 127
}

128
// HandleMessage implements the inet.Handler interface.
129
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage {
130 131 132

	mData := mes.Data()
	if mData == nil {
133
		log.Error("Message contained nil data.")
134
		return nil
135 136 137 138
	}

	mPeer := mes.Peer()
	if mPeer == nil {
139
		log.Error("Message contained nil peer.")
140
		return nil
141 142 143
	}

	// deserialize msg
144
	pmes := new(pb.Message)
145 146
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
147
		log.Error("Error unmarshaling data")
148
		return nil
149 150 151
	}

	// update the peer (on valid msgs only)
152
	dht.Update(ctx, mPeer)
153

154
	log.Event(ctx, "foo", dht.self, mPeer, pmes)
155 156 157 158

	// get handler for this msg type.
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
159
		log.Error("got back nil handler from handlerForMsgType")
160
		return nil
161 162 163
	}

	// dispatch handler.
164
	rpmes, err := handler(ctx, mPeer, pmes)
165
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166
		log.Errorf("handle message error: %s", err)
167
		return nil
168 169
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170 171
	// if nil response, return it before serializing
	if rpmes == nil {
172
		log.Warning("Got back nil response from request.")
173
		return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174 175
	}

176 177 178
	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
179
		log.Errorf("serialze response error: %s", err)
180
		return nil
181 182
	}

183
	return rmes
184 185
}

186 187
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
188
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
189 190 191 192 193 194 195 196

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

197
	rmes, err := dht.sender.SendRequest(ctx, mes) // respect?
198 199 200
	if err != nil {
		return nil, err
	}
201 202 203
	if rmes == nil {
		return nil, errors.New("no response to request")
	}
204
	log.Event(ctx, "sentMessage", dht.self, p, pmes)
205

206
	rmes.Peer().SetLatency(time.Since(start))
207

208
	rpmes := new(pb.Message)
209 210 211 212 213 214
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}
	return rpmes, nil
}

215
// putValueToNetwork stores the given key/value pair at the peer 'p'
216
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
217
	key string, rec *pb.Record) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218

219
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
220
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
222 223 224
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225

226
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227 228 229
		return errors.New("value not put correctly")
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
230 231
}

Jeromy's avatar
Jeromy committed
232 233
// putProvider sends a message to peer 'p' saying that the local node
// can provide the value of 'key'
234
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235

236
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
237 238

	// add self as the provider
239
	pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self})
240

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
241
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
242 243 244
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
245

246
	log.Debugf("%s putProvider: %s for %s", dht.self, p, u.Key(key))
247
	if rpmes.GetKey() != pmes.GetKey() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
248 249 250 251
		return errors.New("provider not added correctly")
	}

	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252 253
}

254 255
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
	key u.Key, level int) ([]byte, []peer.Peer, error) {
256 257

	pmes, err := dht.getValueSingle(ctx, p, key, level)
258
	if err != nil {
259
		return nil, nil, err
260 261
	}

262
	if record := pmes.GetRecord(); record != nil {
263
		// Success! We were given the value
Jeromy's avatar
Jeromy committed
264
		log.Debug("getValueOrPeers: got value")
265 266 267 268

		// make sure record is still valid
		err = dht.verifyRecord(record)
		if err != nil {
Jeromy's avatar
Jeromy committed
269
			log.Error("Received invalid record!")
270 271 272
			return nil, nil, err
		}
		return record.GetValue(), nil, nil
273
	}
274

275
	// TODO decide on providers. This probably shouldn't be happening.
276 277 278 279 280
	if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
		val, err := dht.getFromPeerList(ctx, key, prv, level)
		if err != nil {
			return nil, nil, err
		}
Jeromy's avatar
Jeromy committed
281
		log.Debug("getValueOrPeers: get from providers")
282 283
		return val, nil, nil
	}
284 285

	// Perhaps we were given closer peers
286
	var peers []peer.Peer
287
	for _, pb := range pmes.GetCloserPeers() {
288
		pr, err := dht.peerFromInfo(pb)
289
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
290
			log.Error(err)
291 292
			continue
		}
293
		peers = append(peers, pr)
294
	}
295 296

	if len(peers) > 0 {
297
		log.Debug("getValueOrPeers: peers")
298 299 300
		return nil, peers, nil
	}

301 302
	log.Warning("getValueOrPeers: routing.ErrNotFound")
	return nil, nil, routing.ErrNotFound
303 304
}

305
// getValueSingle simply performs the get value RPC with the given parameters
306
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
307
	key u.Key, level int) (*pb.Message, error) {
308

309
	pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level)
310
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
311 312
}

313 314 315 316
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
317
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
318
	peerlist []*pb.Message_Peer, level int) ([]byte, error) {
319

320
	for _, pinfo := range peerlist {
321
		p, err := dht.ensureConnectedToPeer(ctx, pinfo)
322
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
323
			log.Errorf("getFromPeers error: %s", err)
324
			continue
Jeromy's avatar
Jeromy committed
325
		}
326 327

		pmes, err := dht.getValueSingle(ctx, p, key, level)
Jeromy's avatar
Jeromy committed
328
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
329
			log.Errorf("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
330 331 332
			continue
		}

333
		if record := pmes.GetRecord(); record != nil {
334
			// Success! We were given the value
335 336 337 338 339

			err := dht.verifyRecord(record)
			if err != nil {
				return nil, err
			}
340
			dht.providers.AddProvider(key, p)
341
			return record.GetValue(), nil
342
		}
Jeromy's avatar
Jeromy committed
343
	}
344
	return nil, routing.ErrNotFound
Jeromy's avatar
Jeromy committed
345 346
}

347
// getLocal attempts to retrieve the value from the datastore
348
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
349 350
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
Jeromy's avatar
Jeromy committed
351
	log.Debug("getLocal %s", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
352
	v, err := dht.datastore.Get(key.DsKey())
353 354 355
	if err != nil {
		return nil, err
	}
Jeromy's avatar
Jeromy committed
356
	log.Debug("found in db")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
357 358 359

	byt, ok := v.([]byte)
	if !ok {
360
		return nil, errors.New("value stored in datastore not []byte")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
361
	}
362 363 364 365 366 367 368 369 370 371
	rec := new(pb.Record)
	err = proto.Unmarshal(byt, rec)
	if err != nil {
		return nil, err
	}

	// TODO: 'if paranoid'
	if u.Debug {
		err = dht.verifyRecord(rec)
		if err != nil {
Jeromy's avatar
Jeromy committed
372
			log.Errorf("local record verify failed: %s", err)
373 374 375 376 377
			return nil, err
		}
	}

	return rec.GetValue(), nil
378 379
}

380
// putLocal stores the key value pair in the datastore
381
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
382 383 384 385 386 387 388 389 390 391
	rec, err := dht.makePutRecord(key, value)
	if err != nil {
		return err
	}
	data, err := proto.Marshal(rec)
	if err != nil {
		return err
	}

	return dht.datastore.Put(key.DsKey(), data)
392
}
393

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
394 395
// Update signals to all routingTables to Update their last-seen status
// on the given peer.
396 397
func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) {
	log.Event(ctx, "updatePeer", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
398
	removedCount := 0
399
	for _, route := range dht.routingTables {
400
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
401
		// Only close the connection if no tables refer to this peer
402
		if removed != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
403
			removedCount++
404
		}
405
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
406 407 408 409 410 411 412 413

	// Only close the connection if no tables refer to this peer
	// if removedCount == len(dht.routingTables) {
	// 	dht.network.ClosePeer(p)
	// }
	// ACTUALLY, no, let's not just close the connection. it may be connected
	// due to other things. it seems that we just need connection timeouts
	// after some deadline of inactivity.
414
}
Jeromy's avatar
Jeromy committed
415

Jeromy's avatar
Jeromy committed
416
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
417
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
418
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
419 420 421 422 423 424 425
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
426

Jeromy's avatar
Jeromy committed
427
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
428 429
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
430
	return dht.sendRequest(ctx, p, pmes)
431
}
432

433 434
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) {
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
435
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
436 437
}

438
func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer {
439
	var provArr []peer.Peer
Jeromy's avatar
Jeromy committed
440
	for _, prov := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
441 442
		p, err := dht.peerFromInfo(prov)
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
443
			log.Errorf("error getting peer from info: %v", err)
Jeromy's avatar
Jeromy committed
444 445
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
446

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
447
		log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
448

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
449
		// Dont add outselves to the list
450
		if p.ID().Equal(dht.self.ID()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
451
			continue
Jeromy's avatar
Jeromy committed
452
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
453 454

		// TODO(jbenet) ensure providers is idempotent
455
		dht.providers.AddProvider(key, p)
456
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
457
	}
458
	return provArr
Jeromy's avatar
Jeromy committed
459
}
Jeromy's avatar
Jeromy committed
460

461
// nearestPeersToQuery returns the routing tables closest peers.
462
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
463 464 465 466
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
467
	closer := cluster.NearestPeers(kb.ConvertKey(key), count)
468 469 470
	return closer
}

471
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
472
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
473
	closer := dht.nearestPeersToQuery(pmes, count)
474 475 476 477 478 479

	// no node? nil
	if closer == nil {
		return nil
	}

480 481
	// == to self? thats bad
	for _, p := range closer {
482
		if p.ID().Equal(dht.self.ID()) {
483 484 485
			log.Error("Attempted to return self! this shouldnt happen...")
			return nil
		}
486 487
	}

488
	var filtered []peer.Peer
489 490 491
	for _, p := range closer {
		// must all be closer than self
		key := u.Key(pmes.GetKey())
492
		if !kb.Closer(dht.self.ID(), p.ID(), key) {
493 494
			filtered = append(filtered, p)
		}
495 496
	}

497 498
	// ok seems like closer nodes
	return filtered
499 500
}

Jeromy's avatar
Jeromy committed
501
// getPeer searches the peerstore for a peer with the given peer ID
502
func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
503
	p, err := dht.peerstore.FindOrCreate(id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
504 505
	if err != nil {
		err = fmt.Errorf("Failed to get peer from peerstore: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
506
		log.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
507 508 509 510 511
		return nil, err
	}
	return p, nil
}

Jeromy's avatar
Jeromy committed
512 513
// peerFromInfo returns a peer using info in the protobuf peer struct
// to lookup or create a peer
514
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {
515 516

	id := peer.ID(pbp.GetId())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
517

518 519
	// bail out if it's ourselves
	//TODO(jbenet) not sure this should be an error _here_
520
	if id.Equal(dht.self.ID()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
521 522 523
		return nil, errors.New("found self")
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
524 525 526
	p, err := dht.getPeer(id)
	if err != nil {
		return nil, err
527 528
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
529 530 531
	maddr, err := pbp.Address()
	if err != nil {
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
532
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
533
	p.AddAddress(maddr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
534 535 536
	return p, nil
}

537
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
538 539 540
	p, err := dht.peerFromInfo(pbp)
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
541 542
	}

543
	// dial connection
544
	err = dht.dialer.DialPeer(ctx, p)
545
	return p, err
Jeromy's avatar
Jeromy committed
546
}
547

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
548
//TODO: this should be smarter about which keys it selects.
549
func (dht *IpfsDHT) loadProvidableKeys() error {
550 551 552 553
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
554 555 556
	for _, dsk := range kl {
		k := u.KeyFromDsKey(dsk)
		if len(k) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
557
			log.Errorf("loadProvidableKeys error: %v", dsk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
558 559 560
		}

		dht.providers.AddProvider(k, dht.self)
561 562 563 564
	}
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
565
// PingRoutine periodically pings nearest neighbors.
566
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
567 568
	defer dht.Children().Done()

569 570 571 572 573 574 575 576
	tick := time.Tick(t)
	for {
		select {
		case <-tick:
			id := make([]byte, 16)
			rand.Read(id)
			peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
			for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
577
				ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
578 579
				err := dht.Ping(ctx, p)
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
580
					log.Errorf("Ping error: %s", err)
581 582
				}
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
583
		case <-dht.Closing():
584 585 586 587 588
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
589
// Bootstrap builds up list of peers by requesting random peer IDs
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
590
func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
591 592
	id := make([]byte, 16)
	rand.Read(id)
Jeromy's avatar
Jeromy committed
593 594 595 596
	p, err := dht.FindPeer(ctx, peer.ID(id))
	if err != nil {
		log.Error("Bootstrap peer error: %s", err)
	}
597
	err = dht.dialer.DialPeer(ctx, p)
598
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
599
		log.Errorf("Bootstrap peer error: %s", err)
600
	}
601
}