dht.go 18.2 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"bytes"
5
	"crypto/rand"
6
	"errors"
7
	"fmt"
8 9
	"sync"
	"time"
10

11
	inet "github.com/jbenet/go-ipfs/net"
12
	msg "github.com/jbenet/go-ipfs/net/message"
13
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
14
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
15
	u "github.com/jbenet/go-ipfs/util"
16

17
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
18
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
19
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
20

21
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
22 23
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25 26 27 28
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
29 30
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
31
	routingTables []*kb.RoutingTable
32

33
	// the network interface. service
34
	network inet.Network
35
	sender  inet.Sender
36

Jeromy's avatar
Jeromy committed
37 38 39 40 41
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
42
	dslock    sync.Mutex
43

44
	providers *ProviderManager
45

46 47
	// Signal to shutdown dht
	shutdown chan struct{}
48 49 50

	// When this peer started up
	birth time.Time
51 52 53

	//lock to make diagnostics work better
	diaglock sync.Mutex
54 55
}

Jeromy's avatar
Jeromy committed
56
// NewDHT creates a new DHT object with the given peer as the 'local' host
57
func NewDHT(p *peer.Peer, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
58
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
59
	dht.network = net
60
	dht.sender = sender
Jeromy's avatar
Jeromy committed
61
	dht.datastore = dstore
62
	dht.self = p
63

64
	dht.providers = NewProviderManager(p.ID)
Jeromy's avatar
Jeromy committed
65
	dht.shutdown = make(chan struct{})
66

67 68 69 70
	dht.routingTables = make([]*kb.RoutingTable, 3)
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
71
	dht.birth = time.Now()
Jeromy's avatar
Jeromy committed
72
	return dht
73 74
}

75
// Start up background goroutines needed by the DHT
76
func (dht *IpfsDHT) Start() {
77
	panic("the service is already started. rmv this method")
78 79
}

80
// Connect to a new peer at the given address, ping and add to the routing table
81
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
82
	maddrstr, _ := addr.String()
83
	u.DOut("Connect to new peer: %s\n", maddrstr)
84 85 86 87 88 89 90 91 92 93 94 95 96

	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
	npeer := &peer.Peer{}
	npeer.AddAddress(addr)
	err := dht.network.DialPeer(npeer)
97
	if err != nil {
98
		return nil, err
99 100
	}

Jeromy's avatar
Jeromy committed
101 102
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
103
	err = dht.Ping(npeer, time.Second*2)
Jeromy's avatar
Jeromy committed
104
	if err != nil {
105
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
106 107
	}

108 109
	dht.Update(npeer)

110
	return npeer, nil
Jeromy's avatar
Jeromy committed
111 112
}

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {

	mData := mes.Data()
	if mData == nil {
		return nil, errors.New("message did not include Data")
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		return nil, errors.New("message did not include a Peer")
	}

	// deserialize msg
	pmes := new(Message)
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
	}

	// update the peer (on valid msgs only)
	dht.Update(mPeer)

	// Print out diagnostic
	u.DOut("[peer: %s]\nGot message type: '%s' [from = %s]\n",
		dht.self.ID.Pretty(),
		Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())

	// get handler for this msg type.
	var resp *Message
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
		return nil, errors.New("Recieved invalid message type")
	}

	// dispatch handler.
	rpmes, err := handler(mPeer, pmes)
	if err != nil {
		return nil, err
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
	}

	return rmes, nil
}

// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(*peer.Peer, *Message) (*Message, error)

func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
	switch t {
	case Message_GET_VALUE:
		return dht.handleGetValue
	// case Message_PUT_VALUE:
	// 	return dht.handlePutValue
	// case Message_FIND_NODE:
	// 	return dht.handleFindPeer
	// case Message_ADD_PROVIDER:
	// 	return dht.handleAddProvider
	// case Message_GET_PROVIDERS:
	// 	return dht.handleGetProviders
	// case Message_PING:
	// 	return dht.handlePing
	// case Message_DIAGNOSTIC:
	// 	return dht.handleDiagnostic
	default:
		return nil
184 185 186
	}
}

Jeromy's avatar
Jeromy committed
187
func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
188
	pmes := Message{
189 190
		Type:  PBDHTMessage_PUT_VALUE,
		Key:   key,
191
		Value: value,
192
		ID:    swarm.GenerateMessageID(),
193 194 195
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
196
	dht.netChan.Outgoing <- mes
197 198 199
	return nil
}

200
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
201
	u.DOut("handleGetValue for key: %s\n", pmes.GetKey())
Jeromy's avatar
Jeromy committed
202
	dskey := ds.NewKey(pmes.GetKey())
203
	resp := &Message{
Jeromy's avatar
Jeromy committed
204
		Response: true,
205
		ID:       pmes.GetId(),
Jeromy's avatar
Jeromy committed
206 207 208
		Key:      pmes.GetKey(),
	}
	iVal, err := dht.datastore.Get(dskey)
Jeromy's avatar
Jeromy committed
209
	if err == nil {
Jeromy's avatar
Jeromy committed
210
		u.DOut("handleGetValue success!\n")
Jeromy's avatar
Jeromy committed
211 212
		resp.Success = true
		resp.Value = iVal.([]byte)
Jeromy's avatar
Jeromy committed
213
	} else if err == ds.ErrNotFound {
Jeromy's avatar
Jeromy committed
214
		// Check if we know any providers for the requested value
215 216
		provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
		if len(provs) > 0 {
217
			u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
218
			resp.Peers = provs
Jeromy's avatar
Jeromy committed
219 220 221
			resp.Success = true
		} else {
			// No providers?
222 223
			// Find closest peer on given cluster to desired key and reply with that info

224 225 226
			level := 0
			if len(pmes.GetValue()) < 1 {
				// TODO: maybe return an error? Defaulting isnt a good idea IMO
Jeromy's avatar
Jeromy committed
227
				u.PErr("handleGetValue: no routing level specified, assuming 0\n")
228 229 230
			} else {
				level = int(pmes.GetValue()[0]) // Using value field to specify cluster level
			}
231
			u.DOut("handleGetValue searching level %d clusters\n", level)
232

233
			closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
234

Jeromy's avatar
Jeromy committed
235
			if closer.ID.Equal(dht.self.ID) {
Jeromy's avatar
Jeromy committed
236
				u.DOut("Attempted to return self! this shouldnt happen...\n")
Jeromy's avatar
Jeromy committed
237 238 239
				resp.Peers = nil
				goto out
			}
240 241 242
			// If this peer is closer than the one from the table, return nil
			if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
				resp.Peers = nil
Jeromy's avatar
Jeromy committed
243
				u.DOut("handleGetValue could not find a closer node than myself.\n")
244
			} else {
245
				u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
246 247
				resp.Peers = []*peer.Peer{closer}
			}
248
		}
Jeromy's avatar
Jeromy committed
249
	} else {
250
		//temp: what other errors can a datastore return?
Jeromy's avatar
Jeromy committed
251
		panic(err)
252
	}
253

Jeromy's avatar
Jeromy committed
254
out:
255
	mes := swarm.NewMessage(p, resp.ToProtobuf())
256
	dht.netChan.Outgoing <- mes
257 258
}

Jeromy's avatar
Jeromy committed
259
// Store a value in this peer local storage
260
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) {
261 262
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
Jeromy's avatar
Jeromy committed
263 264 265 266 267 268
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
269 270
}

271
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) {
272
	u.DOut("[%s] Responding to ping from [%s]!\n", dht.self.ID.Pretty(), p.ID.Pretty())
273
	resp := Message{
274
		Type:     pmes.GetType(),
275
		Response: true,
276
		ID:       pmes.GetId(),
277
	}
278

279
	dht.netChan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
280 281
}

282
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
283
	resp := Message{
284
		Type:     pmes.GetType(),
285
		ID:       pmes.GetId(),
286 287 288 289
		Response: true,
	}
	defer func() {
		mes := swarm.NewMessage(p, resp.ToProtobuf())
290
		dht.netChan.Outgoing <- mes
291 292
	}()
	level := pmes.GetValue()[0]
293
	u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty())
294
	closest := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
Jeromy's avatar
Jeromy committed
295
	if closest == nil {
Jeromy's avatar
Jeromy committed
296
		u.PErr("handleFindPeer: could not find anything.\n")
297
		return
Jeromy's avatar
Jeromy committed
298 299 300
	}

	if len(closest.Addresses) == 0 {
Jeromy's avatar
Jeromy committed
301
		u.PErr("handleFindPeer: no addresses for connected peer...\n")
302
		return
Jeromy's avatar
Jeromy committed
303 304
	}

305 306 307
	// If the found peer further away than this peer...
	if kb.Closer(dht.self.ID, closest.ID, u.Key(pmes.GetKey())) {
		return
Jeromy's avatar
Jeromy committed
308 309
	}

310
	u.DOut("handleFindPeer: sending back '%s'\n", closest.ID.Pretty())
311 312 313 314 315
	resp.Peers = []*peer.Peer{closest}
	resp.Success = true
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
316
	resp := Message{
317 318
		Type:     PBDHTMessage_GET_PROVIDERS,
		Key:      pmes.GetKey(),
319
		ID:       pmes.GetId(),
320
		Response: true,
Jeromy's avatar
Jeromy committed
321 322
	}

323 324 325 326 327
	has, err := dht.datastore.Has(ds.NewKey(pmes.GetKey()))
	if err != nil {
		dht.netChan.Errors <- err
	}

328
	providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
329 330 331
	if has {
		providers = append(providers, dht.self)
	}
332
	if providers == nil || len(providers) == 0 {
Jeromy's avatar
Jeromy committed
333 334 335 336 337
		level := 0
		if len(pmes.GetValue()) > 0 {
			level = int(pmes.GetValue()[0])
		}

338
		closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
Jeromy's avatar
Jeromy committed
339 340 341 342 343
		if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
			resp.Peers = nil
		} else {
			resp.Peers = []*peer.Peer{closer}
		}
344
	} else {
345
		resp.Peers = providers
346
		resp.Success = true
347 348 349
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
350
	dht.netChan.Outgoing <- mes
351 352
}

353 354
type providerInfo struct {
	Creation time.Time
355
	Value    *peer.Peer
356 357
}

358
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) {
359
	key := u.Key(pmes.GetKey())
Jeromy's avatar
Jeromy committed
360
	u.DOut("[%s] Adding [%s] as a provider for '%s'\n", dht.self.ID.Pretty(), p.ID.Pretty(), peer.ID(key).Pretty())
361
	dht.providers.AddProvider(key, p)
362 363
}

364
// Halt stops all communications from this peer and shut down
365 366 367
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
368 369
	dht.providers.Halt()
	dht.listener.Halt()
370
}
371

Jeromy's avatar
Jeromy committed
372
// NOTE: not yet finished, low priority
373
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
374
	seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
375
	listenChan := dht.listener.Listen(pmes.GetId(), len(seq), time.Second*30)
376

377
	for _, ps := range seq {
378
		mes := swarm.NewMessage(ps, pmes)
379
		dht.netChan.Outgoing <- mes
380 381 382
	}

	buf := new(bytes.Buffer)
383 384 385
	di := dht.getDiagInfo()
	buf.Write(di.Marshal())

386 387 388 389 390 391 392 393
	// NOTE: this shouldnt be a hardcoded value
	after := time.After(time.Second * 20)
	count := len(seq)
	for count > 0 {
		select {
		case <-after:
			//Timeout, return what we have
			goto out
394 395 396
		case reqResp := <-listenChan:
			pmesOut := new(PBDHTMessage)
			err := proto.Unmarshal(reqResp.Data, pmesOut)
397 398 399 400
			if err != nil {
				// It broke? eh, whatever, keep going
				continue
			}
401
			buf.Write(reqResp.Data)
402 403 404 405 406
			count--
		}
	}

out:
407
	resp := Message{
408
		Type:     PBDHTMessage_DIAGNOSTIC,
409
		ID:       pmes.GetId(),
410
		Value:    buf.Bytes(),
411 412 413 414
		Response: true,
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
415
	dht.netChan.Outgoing <- mes
416
}
417

418 419 420
func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {
	pmes, err := dht.getValueSingle(p, key, timeout, level)
	if err != nil {
421
		return nil, nil, err
422 423 424 425 426 427 428 429 430 431 432 433 434
	}

	if pmes.GetSuccess() {
		if pmes.Value == nil { // We were given provider[s]
			val, err := dht.getFromPeerList(key, timeout, pmes.GetPeers(), level)
			if err != nil {
				return nil, nil, err
			}
			return val, nil, nil
		}

		// Success! We were given the value
		return pmes.GetValue(), nil, nil
435
	}
436

437 438 439 440 441 442 443 444
	// We were given a closer node
	var peers []*peer.Peer
	for _, pb := range pmes.GetPeers() {
		if peer.ID(pb.GetId()).Equal(dht.self.ID) {
			continue
		}
		addr, err := ma.NewMultiaddr(pb.GetAddr())
		if err != nil {
445
			u.PErr("%v\n", err.Error())
446 447
			continue
		}
448

449 450
		np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
		if err != nil {
451
			u.PErr("%v\n", err.Error())
452
			continue
453
		}
454 455

		peers = append(peers, np)
456
	}
457
	return nil, peers, nil
458 459
}

460 461
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*PBDHTMessage, error) {
462
	pmes := Message{
463 464 465
		Type:  PBDHTMessage_GET_VALUE,
		Key:   string(key),
		Value: []byte{byte(level)},
466
		ID:    swarm.GenerateMessageID(),
Jeromy's avatar
Jeromy committed
467
	}
468
	responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
Jeromy's avatar
Jeromy committed
469 470

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
471
	t := time.Now()
472
	dht.netChan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
473 474 475 476 477

	// Wait for either the response or a timeout
	timeup := time.After(timeout)
	select {
	case <-timeup:
478
		dht.listener.Unlisten(pmes.ID)
Jeromy's avatar
Jeromy committed
479
		return nil, u.ErrTimeout
480
	case resp, ok := <-responseChan:
Jeromy's avatar
Jeromy committed
481
		if !ok {
Jeromy's avatar
Jeromy committed
482
			u.PErr("response channel closed before timeout, please investigate.\n")
Jeromy's avatar
Jeromy committed
483 484
			return nil, u.ErrTimeout
		}
485 486
		roundtrip := time.Since(t)
		resp.Peer.SetLatency(roundtrip)
487 488
		pmesOut := new(PBDHTMessage)
		err := proto.Unmarshal(resp.Data, pmesOut)
Jeromy's avatar
Jeromy committed
489 490 491
		if err != nil {
			return nil, err
		}
492
		return pmesOut, nil
Jeromy's avatar
Jeromy committed
493 494 495
	}
}

496 497 498 499 500 501 502 503 504 505
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
	peerlist []*PBDHTMessage_PBPeer, level int) ([]byte, error) {
	for _, pinfo := range peerlist {
		p, _ := dht.Find(peer.ID(pinfo.GetId()))
		if p == nil {
			maddr, err := ma.NewMultiaddr(pinfo.GetAddr())
Jeromy's avatar
Jeromy committed
506
			if err != nil {
507
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
508 509
				continue
			}
510

511
			p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr)
Jeromy's avatar
Jeromy committed
512
			if err != nil {
513
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
514 515 516
				continue
			}
		}
517
		pmes, err := dht.getValueSingle(p, key, timeout, level)
Jeromy's avatar
Jeromy committed
518
		if err != nil {
519
			u.DErr("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
520 521
			continue
		}
522
		dht.providers.AddProvider(key, p)
Jeromy's avatar
Jeromy committed
523

524 525 526 527
		// Make sure it was a successful get
		if pmes.GetSuccess() && pmes.Value != nil {
			return pmes.GetValue(), nil
		}
Jeromy's avatar
Jeromy committed
528 529 530 531
	}
	return nil, u.ErrNotFound
}

532
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
533 534
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
535
	v, err := dht.datastore.Get(ds.NewKey(string(key)))
536 537 538 539 540 541
	if err != nil {
		return nil, err
	}
	return v.([]byte), nil
}

542
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
543 544
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}
545

546
// Update TODO(chas) Document this function
547
func (dht *IpfsDHT) Update(p *peer.Peer) {
548
	for _, route := range dht.routingTables {
549
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
550
		// Only close the connection if no tables refer to this peer
551 552
		if removed != nil {
			found := false
553
			for _, r := range dht.routingTables {
554 555 556 557 558 559
				if r.Find(removed.ID) != nil {
					found = true
					break
				}
			}
			if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
560
				dht.network.CloseConnection(removed)
561 562
			}
		}
563 564
	}
}
Jeromy's avatar
Jeromy committed
565

566
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
567
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
568
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
569 570 571 572 573 574 575
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
576 577

func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*PBDHTMessage, error) {
578
	pmes := Message{
579 580
		Type:  PBDHTMessage_FIND_NODE,
		Key:   string(id),
581
		ID:    swarm.GenerateMessageID(),
582 583 584 585
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
586
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
587
	t := time.Now()
588
	dht.netChan.Outgoing <- mes
589 590 591
	after := time.After(timeout)
	select {
	case <-after:
592
		dht.listener.Unlisten(pmes.ID)
593 594
		return nil, u.ErrTimeout
	case resp := <-listenChan:
595 596
		roundtrip := time.Since(t)
		resp.Peer.SetLatency(roundtrip)
597 598
		pmesOut := new(PBDHTMessage)
		err := proto.Unmarshal(resp.Data, pmesOut)
599 600 601 602
		if err != nil {
			return nil, err
		}

603
		return pmesOut, nil
604 605
	}
}
606

607 608
func (dht *IpfsDHT) printTables() {
	for _, route := range dht.routingTables {
609 610 611
		route.Print()
	}
}
Jeromy's avatar
Jeromy committed
612 613

func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*PBDHTMessage, error) {
614
	pmes := Message{
Jeromy's avatar
Jeromy committed
615 616
		Type:  PBDHTMessage_GET_PROVIDERS,
		Key:   string(key),
617
		ID:    swarm.GenerateMessageID(),
Jeromy's avatar
Jeromy committed
618 619 620 621 622
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

623
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
624
	dht.netChan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
625 626 627
	after := time.After(timeout)
	select {
	case <-after:
628
		dht.listener.Unlisten(pmes.ID)
Jeromy's avatar
Jeromy committed
629 630
		return nil, u.ErrTimeout
	case resp := <-listenChan:
Jeromy's avatar
Jeromy committed
631
		u.DOut("FindProviders: got response.\n")
632 633
		pmesOut := new(PBDHTMessage)
		err := proto.Unmarshal(resp.Data, pmesOut)
Jeromy's avatar
Jeromy committed
634 635 636 637
		if err != nil {
			return nil, err
		}

638
		return pmesOut, nil
Jeromy's avatar
Jeromy committed
639 640 641
	}
}

642
// TODO: Could be done async
Jeromy's avatar
Jeromy committed
643
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer.Peer {
644
	var provArr []*peer.Peer
Jeromy's avatar
Jeromy committed
645 646 647 648 649 650
	for _, prov := range peers {
		// Dont add outselves to the list
		if peer.ID(prov.GetId()).Equal(dht.self.ID) {
			continue
		}
		// Dont add someone who is already on the list
651
		p := dht.network.GetPeer(u.Key(prov.GetId()))
Jeromy's avatar
Jeromy committed
652
		if p == nil {
653
			u.DOut("given provider %s was not in our network already.\n", peer.ID(prov.GetId()).Pretty())
Jeromy's avatar
Jeromy committed
654 655
			var err error
			p, err = dht.peerFromInfo(prov)
Jeromy's avatar
Jeromy committed
656
			if err != nil {
657
				u.PErr("error connecting to new peer: %s\n", err)
Jeromy's avatar
Jeromy committed
658 659 660
				continue
			}
		}
661
		dht.providers.AddProvider(key, p)
662
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
663
	}
664
	return provArr
Jeromy's avatar
Jeromy committed
665
}
Jeromy's avatar
Jeromy committed
666 667 668 669 670 671 672 673 674

func (dht *IpfsDHT) peerFromInfo(pbp *PBDHTMessage_PBPeer) (*peer.Peer, error) {
	maddr, err := ma.NewMultiaddr(pbp.GetAddr())
	if err != nil {
		return nil, err
	}

	return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
}
675 676

func (dht *IpfsDHT) loadProvidableKeys() error {
677 678 679 680
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
681 682 683 684 685 686 687 688 689 690 691 692
	for _, k := range kl {
		dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
	}
	return nil
}

// Builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap() {
	id := make([]byte, 16)
	rand.Read(id)
	dht.FindPeer(peer.ID(id), time.Second*10)
}