dht.go 18.5 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"bytes"
5
	"crypto/rand"
6
	"errors"
7
	"fmt"
8 9
	"sync"
	"time"
10

11
	inet "github.com/jbenet/go-ipfs/net"
12
	msg "github.com/jbenet/go-ipfs/net/message"
13
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
14
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
15
	u "github.com/jbenet/go-ipfs/util"
16

17
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
18
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
19
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
20

21
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
22 23
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25 26 27 28
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
29 30
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
31
	routingTables []*kb.RoutingTable
32

33
	// the network interface. service
34
	network inet.Network
35
	sender  inet.Sender
36

Jeromy's avatar
Jeromy committed
37 38 39 40 41
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
42
	dslock    sync.Mutex
43

44
	providers *ProviderManager
45

46 47
	// Signal to shutdown dht
	shutdown chan struct{}
48 49 50

	// When this peer started up
	birth time.Time
51 52 53

	//lock to make diagnostics work better
	diaglock sync.Mutex
54 55
}

Jeromy's avatar
Jeromy committed
56
// NewDHT creates a new DHT object with the given peer as the 'local' host
57
func NewDHT(p *peer.Peer, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
58
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
59
	dht.network = net
60
	dht.sender = sender
Jeromy's avatar
Jeromy committed
61
	dht.datastore = dstore
62
	dht.self = p
63

64
	dht.providers = NewProviderManager(p.ID)
Jeromy's avatar
Jeromy committed
65
	dht.shutdown = make(chan struct{})
66

67 68 69 70
	dht.routingTables = make([]*kb.RoutingTable, 3)
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
71
	dht.birth = time.Now()
Jeromy's avatar
Jeromy committed
72
	return dht
73 74
}

75
// Start up background goroutines needed by the DHT
76
func (dht *IpfsDHT) Start() {
77
	panic("the service is already started. rmv this method")
78 79
}

80
// Connect to a new peer at the given address, ping and add to the routing table
81
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
82
	maddrstr, _ := addr.String()
83
	u.DOut("Connect to new peer: %s\n", maddrstr)
84 85 86 87 88 89 90 91 92 93 94 95 96

	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
	npeer := &peer.Peer{}
	npeer.AddAddress(addr)
	err := dht.network.DialPeer(npeer)
97
	if err != nil {
98
		return nil, err
99 100
	}

Jeromy's avatar
Jeromy committed
101 102
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
103
	err = dht.Ping(npeer, time.Second*2)
Jeromy's avatar
Jeromy committed
104
	if err != nil {
105
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
106 107
	}

108 109
	dht.Update(npeer)

110
	return npeer, nil
Jeromy's avatar
Jeromy committed
111 112
}

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {

	mData := mes.Data()
	if mData == nil {
		return nil, errors.New("message did not include Data")
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		return nil, errors.New("message did not include a Peer")
	}

	// deserialize msg
	pmes := new(Message)
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
	}

	// update the peer (on valid msgs only)
	dht.Update(mPeer)

	// Print out diagnostic
	u.DOut("[peer: %s]\nGot message type: '%s' [from = %s]\n",
		dht.self.ID.Pretty(),
		Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())

	// get handler for this msg type.
	var resp *Message
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
		return nil, errors.New("Recieved invalid message type")
	}

	// dispatch handler.
	rpmes, err := handler(mPeer, pmes)
	if err != nil {
		return nil, err
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
	}

	return rmes, nil
}

// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(*peer.Peer, *Message) (*Message, error)

func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
	switch t {
	case Message_GET_VALUE:
		return dht.handleGetValue
	// case Message_PUT_VALUE:
	// 	return dht.handlePutValue
	// case Message_FIND_NODE:
	// 	return dht.handleFindPeer
	// case Message_ADD_PROVIDER:
	// 	return dht.handleAddProvider
	// case Message_GET_PROVIDERS:
	// 	return dht.handleGetProviders
	// case Message_PING:
	// 	return dht.handlePing
	// case Message_DIAGNOSTIC:
	// 	return dht.handleDiagnostic
	default:
		return nil
184 185 186
	}
}

Jeromy's avatar
Jeromy committed
187
func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188 189 190 191
	typ := Message_PUT_VALUE
	pmes := &Message{
		Type:  &typ,
		Key:   &key,
192 193 194
		Value: value,
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195 196 197 198 199
	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return err
	}
	return dht.sender.SendMessage(context.TODO(), mes)
200 201
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) {
203
	u.DOut("handleGetValue for key: %s\n", pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204 205

	// setup response
206
	resp := &Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207 208 209 210 211 212 213
		Type: pmes.Type,
		Key:  pmes.Key,
	}

	// first, is the key even a key?
	key := pmes.GetKey()
	if key == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
		return nil, errors.New("handleGetValue but no key was provided")
Jeromy's avatar
Jeromy committed
215
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216 217 218

	// let's first check if we have the value locally.
	dskey := ds.NewKey(pmes.GetKey())
Jeromy's avatar
Jeromy committed
219
	iVal, err := dht.datastore.Get(dskey)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220 221 222 223 224 225 226

	// if we got an unexpected error, bail.
	if err != ds.ErrNotFound {
		return nil, err
	}

	// if we have the value, respond with it!
Jeromy's avatar
Jeromy committed
227
	if err == nil {
Jeromy's avatar
Jeromy committed
228
		u.DOut("handleGetValue success!\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229 230 231 232

		byts, ok := iVal.([]byte)
		if !ok {
			return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
233
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273

		resp.Value = byts
		return resp, nil
	}

	// if we know any providers for the requested value, return those.
	provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
	if len(provs) > 0 {
		u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
		resp.ProviderPeers = provs
		return resp, nil
	}

	// Find closest peer on given cluster to desired key and reply with that info
	// TODO: this should probably be decomposed.

	// stored levels are > 1, to distinguish missing levels.
	level := pmes.GetClusterLevel()
	if level < 0 {
		// TODO: maybe return an error? Defaulting isnt a good idea IMO
		u.PErr("handleGetValue: no routing level specified, assuming 0\n")
		level = 0
	}
	u.DOut("handleGetValue searching level %d clusters\n", level)

	ck := kb.ConvertKey(u.Key(pmes.GetKey()))
	closer := dht.routingTables[level].NearestPeer(ck)

	// if closer peer is self, return nil
	if closer.ID.Equal(dht.self.ID) {
		u.DOut("Attempted to return self! this shouldnt happen...\n")
		resp.CloserPeers = nil
		return resp, nil
	}

	// if self is closer than the one from the table, return nil
	if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
		u.DOut("handleGetValue could not find a closer node than myself.\n")
		resp.CloserPeers = nil
		return resp, nil
274
	}
275

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
276 277 278 279
	// we got a closer peer, it seems. return it.
	u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
	resp.CloserPeers = []*peer.Peer{closer}
	return resp, nil
280 281
}

Jeromy's avatar
Jeromy committed
282
// Store a value in this peer local storage
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
283
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) {
284 285
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
Jeromy's avatar
Jeromy committed
286 287 288 289 290 291
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
292 293
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
294
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) {
295
	u.DOut("[%s] Responding to ping from [%s]!\n", dht.self.ID.Pretty(), p.ID.Pretty())
296
	resp := Message{
297
		Type:     pmes.GetType(),
298
		Response: true,
299
		ID:       pmes.GetId(),
300
	}
301

302
	dht.netChan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
303 304
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
305
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) {
306
	resp := Message{
307
		Type:     pmes.GetType(),
308
		ID:       pmes.GetId(),
309 310 311 312
		Response: true,
	}
	defer func() {
		mes := swarm.NewMessage(p, resp.ToProtobuf())
313
		dht.netChan.Outgoing <- mes
314 315
	}()
	level := pmes.GetValue()[0]
316
	u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty())
317
	closest := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
Jeromy's avatar
Jeromy committed
318
	if closest == nil {
Jeromy's avatar
Jeromy committed
319
		u.PErr("handleFindPeer: could not find anything.\n")
320
		return
Jeromy's avatar
Jeromy committed
321 322 323
	}

	if len(closest.Addresses) == 0 {
Jeromy's avatar
Jeromy committed
324
		u.PErr("handleFindPeer: no addresses for connected peer...\n")
325
		return
Jeromy's avatar
Jeromy committed
326 327
	}

328 329 330
	// If the found peer further away than this peer...
	if kb.Closer(dht.self.ID, closest.ID, u.Key(pmes.GetKey())) {
		return
Jeromy's avatar
Jeromy committed
331 332
	}

333
	u.DOut("handleFindPeer: sending back '%s'\n", closest.ID.Pretty())
334 335 336 337
	resp.Peers = []*peer.Peer{closest}
	resp.Success = true
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
338
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) {
339
	resp := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
340
		Type:     Message_GET_PROVIDERS,
341
		Key:      pmes.GetKey(),
342
		ID:       pmes.GetId(),
343
		Response: true,
Jeromy's avatar
Jeromy committed
344 345
	}

346 347 348 349 350
	has, err := dht.datastore.Has(ds.NewKey(pmes.GetKey()))
	if err != nil {
		dht.netChan.Errors <- err
	}

351
	providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
352 353 354
	if has {
		providers = append(providers, dht.self)
	}
355
	if providers == nil || len(providers) == 0 {
Jeromy's avatar
Jeromy committed
356 357 358 359 360
		level := 0
		if len(pmes.GetValue()) > 0 {
			level = int(pmes.GetValue()[0])
		}

361
		closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
Jeromy's avatar
Jeromy committed
362 363 364 365 366
		if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
			resp.Peers = nil
		} else {
			resp.Peers = []*peer.Peer{closer}
		}
367
	} else {
368
		resp.Peers = providers
369
		resp.Success = true
370 371 372
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
373
	dht.netChan.Outgoing <- mes
374 375
}

376 377
type providerInfo struct {
	Creation time.Time
378
	Value    *peer.Peer
379 380
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
381
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) {
382
	key := u.Key(pmes.GetKey())
Jeromy's avatar
Jeromy committed
383
	u.DOut("[%s] Adding [%s] as a provider for '%s'\n", dht.self.ID.Pretty(), p.ID.Pretty(), peer.ID(key).Pretty())
384
	dht.providers.AddProvider(key, p)
385 386
}

387
// Halt stops all communications from this peer and shut down
388 389 390
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
391 392
	dht.providers.Halt()
	dht.listener.Halt()
393
}
394

Jeromy's avatar
Jeromy committed
395
// NOTE: not yet finished, low priority
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
396
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) {
397
	seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
398
	listenChan := dht.listener.Listen(pmes.GetId(), len(seq), time.Second*30)
399

400
	for _, ps := range seq {
401
		mes := swarm.NewMessage(ps, pmes)
402
		dht.netChan.Outgoing <- mes
403 404 405
	}

	buf := new(bytes.Buffer)
406 407 408
	di := dht.getDiagInfo()
	buf.Write(di.Marshal())

409 410 411 412 413 414 415 416
	// NOTE: this shouldnt be a hardcoded value
	after := time.After(time.Second * 20)
	count := len(seq)
	for count > 0 {
		select {
		case <-after:
			//Timeout, return what we have
			goto out
417
		case reqResp := <-listenChan:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
418
			pmesOut := new(Message)
419
			err := proto.Unmarshal(reqResp.Data, pmesOut)
420 421 422 423
			if err != nil {
				// It broke? eh, whatever, keep going
				continue
			}
424
			buf.Write(reqResp.Data)
425 426 427 428 429
			count--
		}
	}

out:
430
	resp := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
431
		Type:     Message_DIAGNOSTIC,
432
		ID:       pmes.GetId(),
433
		Value:    buf.Bytes(),
434 435 436 437
		Response: true,
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
438
	dht.netChan.Outgoing <- mes
439
}
440

441 442 443
func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {
	pmes, err := dht.getValueSingle(p, key, timeout, level)
	if err != nil {
444
		return nil, nil, err
445 446 447 448 449 450 451 452 453 454 455 456 457
	}

	if pmes.GetSuccess() {
		if pmes.Value == nil { // We were given provider[s]
			val, err := dht.getFromPeerList(key, timeout, pmes.GetPeers(), level)
			if err != nil {
				return nil, nil, err
			}
			return val, nil, nil
		}

		// Success! We were given the value
		return pmes.GetValue(), nil, nil
458
	}
459

460 461 462 463 464 465 466 467
	// We were given a closer node
	var peers []*peer.Peer
	for _, pb := range pmes.GetPeers() {
		if peer.ID(pb.GetId()).Equal(dht.self.ID) {
			continue
		}
		addr, err := ma.NewMultiaddr(pb.GetAddr())
		if err != nil {
468
			u.PErr("%v\n", err.Error())
469 470
			continue
		}
471

472 473
		np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
		if err != nil {
474
			u.PErr("%v\n", err.Error())
475
			continue
476
		}
477 478

		peers = append(peers, np)
479
	}
480
	return nil, peers, nil
481 482
}

483
// getValueSingle simply performs the get value RPC with the given parameters
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
484
func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*Message, error) {
485
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
486
		Type:  Message_GET_VALUE,
487 488
		Key:   string(key),
		Value: []byte{byte(level)},
489
		ID:    swarm.GenerateMessageID(),
Jeromy's avatar
Jeromy committed
490
	}
491
	responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
Jeromy's avatar
Jeromy committed
492 493

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
494
	t := time.Now()
495
	dht.netChan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
496 497 498 499 500

	// Wait for either the response or a timeout
	timeup := time.After(timeout)
	select {
	case <-timeup:
501
		dht.listener.Unlisten(pmes.ID)
Jeromy's avatar
Jeromy committed
502
		return nil, u.ErrTimeout
503
	case resp, ok := <-responseChan:
Jeromy's avatar
Jeromy committed
504
		if !ok {
Jeromy's avatar
Jeromy committed
505
			u.PErr("response channel closed before timeout, please investigate.\n")
Jeromy's avatar
Jeromy committed
506 507
			return nil, u.ErrTimeout
		}
508 509
		roundtrip := time.Since(t)
		resp.Peer.SetLatency(roundtrip)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
510
		pmesOut := new(Message)
511
		err := proto.Unmarshal(resp.Data, pmesOut)
Jeromy's avatar
Jeromy committed
512 513 514
		if err != nil {
			return nil, err
		}
515
		return pmesOut, nil
Jeromy's avatar
Jeromy committed
516 517 518
	}
}

519 520 521 522 523
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
524
	peerlist []*Message_PBPeer, level int) ([]byte, error) {
525 526 527 528
	for _, pinfo := range peerlist {
		p, _ := dht.Find(peer.ID(pinfo.GetId()))
		if p == nil {
			maddr, err := ma.NewMultiaddr(pinfo.GetAddr())
Jeromy's avatar
Jeromy committed
529
			if err != nil {
530
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
531 532
				continue
			}
533

534
			p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr)
Jeromy's avatar
Jeromy committed
535
			if err != nil {
536
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
537 538 539
				continue
			}
		}
540
		pmes, err := dht.getValueSingle(p, key, timeout, level)
Jeromy's avatar
Jeromy committed
541
		if err != nil {
542
			u.DErr("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
543 544
			continue
		}
545
		dht.providers.AddProvider(key, p)
Jeromy's avatar
Jeromy committed
546

547 548 549 550
		// Make sure it was a successful get
		if pmes.GetSuccess() && pmes.Value != nil {
			return pmes.GetValue(), nil
		}
Jeromy's avatar
Jeromy committed
551 552 553 554
	}
	return nil, u.ErrNotFound
}

555
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
556 557
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
558
	v, err := dht.datastore.Get(ds.NewKey(string(key)))
559 560 561 562 563 564
	if err != nil {
		return nil, err
	}
	return v.([]byte), nil
}

565
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
566 567
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}
568

569
// Update TODO(chas) Document this function
570
func (dht *IpfsDHT) Update(p *peer.Peer) {
571
	for _, route := range dht.routingTables {
572
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
573
		// Only close the connection if no tables refer to this peer
574 575
		if removed != nil {
			found := false
576
			for _, r := range dht.routingTables {
577 578 579 580 581 582
				if r.Find(removed.ID) != nil {
					found = true
					break
				}
			}
			if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
583
				dht.network.CloseConnection(removed)
584 585
			}
		}
586 587
	}
}
Jeromy's avatar
Jeromy committed
588

589
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
590
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
591
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
592 593 594 595 596 597 598
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
599

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
600
func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*Message, error) {
601
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
602
		Type:  Message_FIND_NODE,
603
		Key:   string(id),
604
		ID:    swarm.GenerateMessageID(),
605 606 607 608
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
609
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
610
	t := time.Now()
611
	dht.netChan.Outgoing <- mes
612 613 614
	after := time.After(timeout)
	select {
	case <-after:
615
		dht.listener.Unlisten(pmes.ID)
616 617
		return nil, u.ErrTimeout
	case resp := <-listenChan:
618 619
		roundtrip := time.Since(t)
		resp.Peer.SetLatency(roundtrip)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
620
		pmesOut := new(Message)
621
		err := proto.Unmarshal(resp.Data, pmesOut)
622 623 624 625
		if err != nil {
			return nil, err
		}

626
		return pmesOut, nil
627 628
	}
}
629

630 631
func (dht *IpfsDHT) printTables() {
	for _, route := range dht.routingTables {
632 633 634
		route.Print()
	}
}
Jeromy's avatar
Jeromy committed
635

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
636
func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*Message, error) {
637
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
638
		Type:  Message_GET_PROVIDERS,
Jeromy's avatar
Jeromy committed
639
		Key:   string(key),
640
		ID:    swarm.GenerateMessageID(),
Jeromy's avatar
Jeromy committed
641 642 643 644 645
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

646
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
647
	dht.netChan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
648 649 650
	after := time.After(timeout)
	select {
	case <-after:
651
		dht.listener.Unlisten(pmes.ID)
Jeromy's avatar
Jeromy committed
652 653
		return nil, u.ErrTimeout
	case resp := <-listenChan:
Jeromy's avatar
Jeromy committed
654
		u.DOut("FindProviders: got response.\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
655
		pmesOut := new(Message)
656
		err := proto.Unmarshal(resp.Data, pmesOut)
Jeromy's avatar
Jeromy committed
657 658 659 660
		if err != nil {
			return nil, err
		}

661
		return pmesOut, nil
Jeromy's avatar
Jeromy committed
662 663 664
	}
}

665
// TODO: Could be done async
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
666
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*Message_PBPeer) []*peer.Peer {
667
	var provArr []*peer.Peer
Jeromy's avatar
Jeromy committed
668 669 670 671 672 673
	for _, prov := range peers {
		// Dont add outselves to the list
		if peer.ID(prov.GetId()).Equal(dht.self.ID) {
			continue
		}
		// Dont add someone who is already on the list
674
		p := dht.network.GetPeer(u.Key(prov.GetId()))
Jeromy's avatar
Jeromy committed
675
		if p == nil {
676
			u.DOut("given provider %s was not in our network already.\n", peer.ID(prov.GetId()).Pretty())
Jeromy's avatar
Jeromy committed
677 678
			var err error
			p, err = dht.peerFromInfo(prov)
Jeromy's avatar
Jeromy committed
679
			if err != nil {
680
				u.PErr("error connecting to new peer: %s\n", err)
Jeromy's avatar
Jeromy committed
681 682 683
				continue
			}
		}
684
		dht.providers.AddProvider(key, p)
685
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
686
	}
687
	return provArr
Jeromy's avatar
Jeromy committed
688
}
Jeromy's avatar
Jeromy committed
689

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
690
func (dht *IpfsDHT) peerFromInfo(pbp *Message_PBPeer) (*peer.Peer, error) {
Jeromy's avatar
Jeromy committed
691 692 693 694 695 696 697
	maddr, err := ma.NewMultiaddr(pbp.GetAddr())
	if err != nil {
		return nil, err
	}

	return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
}
698 699

func (dht *IpfsDHT) loadProvidableKeys() error {
700 701 702 703
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
704 705 706 707 708 709 710 711 712 713 714 715
	for _, k := range kl {
		dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
	}
	return nil
}

// Builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap() {
	id := make([]byte, 16)
	rand.Read(id)
	dht.FindPeer(peer.ID(id), time.Second*10)
}