dht.go 18.2 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"bytes"
5
	"crypto/rand"
6
	"errors"
7
	"fmt"
8 9
	"sync"
	"time"
10

11
	inet "github.com/jbenet/go-ipfs/net"
12
	msg "github.com/jbenet/go-ipfs/net/message"
13
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
14
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
15
	u "github.com/jbenet/go-ipfs/util"
16

17
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
18
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
19
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
20

21
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
22 23
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25 26 27 28
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
29 30
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
31
	routingTables []*kb.RoutingTable
32

33
	// the network interface. service
34
	network inet.Network
35
	sender  inet.Sender
36

Jeromy's avatar
Jeromy committed
37 38 39 40 41
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
42
	dslock    sync.Mutex
43

44
	providers *ProviderManager
45

46 47
	// Signal to shutdown dht
	shutdown chan struct{}
48 49 50

	// When this peer started up
	birth time.Time
51 52 53

	//lock to make diagnostics work better
	diaglock sync.Mutex
54 55
}

Jeromy's avatar
Jeromy committed
56
// NewDHT creates a new DHT object with the given peer as the 'local' host
57
func NewDHT(p *peer.Peer, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
58
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
59
	dht.network = net
60
	dht.sender = sender
Jeromy's avatar
Jeromy committed
61
	dht.datastore = dstore
62
	dht.self = p
63

64
	dht.providers = NewProviderManager(p.ID)
Jeromy's avatar
Jeromy committed
65
	dht.shutdown = make(chan struct{})
66

67 68 69 70
	dht.routingTables = make([]*kb.RoutingTable, 3)
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
71
	dht.birth = time.Now()
Jeromy's avatar
Jeromy committed
72
	return dht
73 74
}

75
// Start up background goroutines needed by the DHT
76
func (dht *IpfsDHT) Start() {
77
	panic("the service is already started. rmv this method")
78 79
}

80
// Connect to a new peer at the given address, ping and add to the routing table
81
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
82
	maddrstr, _ := addr.String()
83
	u.DOut("Connect to new peer: %s\n", maddrstr)
84 85 86 87 88 89 90 91 92 93 94 95 96

	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
	npeer := &peer.Peer{}
	npeer.AddAddress(addr)
	err := dht.network.DialPeer(npeer)
97
	if err != nil {
98
		return nil, err
99 100
	}

Jeromy's avatar
Jeromy committed
101 102
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
103
	err = dht.Ping(npeer, time.Second*2)
Jeromy's avatar
Jeromy committed
104
	if err != nil {
105
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
106 107
	}

108 109
	dht.Update(npeer)

110
	return npeer, nil
Jeromy's avatar
Jeromy committed
111 112
}

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {

	mData := mes.Data()
	if mData == nil {
		return nil, errors.New("message did not include Data")
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		return nil, errors.New("message did not include a Peer")
	}

	// deserialize msg
	pmes := new(Message)
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
	}

	// update the peer (on valid msgs only)
	dht.Update(mPeer)

	// Print out diagnostic
	u.DOut("[peer: %s]\nGot message type: '%s' [from = %s]\n",
		dht.self.ID.Pretty(),
		Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())

	// get handler for this msg type.
	var resp *Message
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
		return nil, errors.New("Recieved invalid message type")
	}

	// dispatch handler.
	rpmes, err := handler(mPeer, pmes)
	if err != nil {
		return nil, err
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
	}

	return rmes, nil
}

// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(*peer.Peer, *Message) (*Message, error)

func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
	switch t {
	case Message_GET_VALUE:
		return dht.handleGetValue
	// case Message_PUT_VALUE:
	// 	return dht.handlePutValue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172 173
	case Message_FIND_NODE:
		return dht.handleFindPeer
174 175 176 177
	// case Message_ADD_PROVIDER:
	// 	return dht.handleAddProvider
	// case Message_GET_PROVIDERS:
	// 	return dht.handleGetProviders
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178 179
	case Message_PING:
		return dht.handlePing
180 181 182 183
	// case Message_DIAGNOSTIC:
	// 	return dht.handleDiagnostic
	default:
		return nil
184 185 186
	}
}

Jeromy's avatar
Jeromy committed
187
func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188 189 190 191
	typ := Message_PUT_VALUE
	pmes := &Message{
		Type:  &typ,
		Key:   &key,
192 193 194
		Value: value,
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195 196 197 198 199
	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return err
	}
	return dht.sender.SendMessage(context.TODO(), mes)
200 201
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) {
203
	u.DOut("handleGetValue for key: %s\n", pmes.GetKey())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204 205

	// setup response
206
	resp := &Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207 208 209 210 211 212 213
		Type: pmes.Type,
		Key:  pmes.Key,
	}

	// first, is the key even a key?
	key := pmes.GetKey()
	if key == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
		return nil, errors.New("handleGetValue but no key was provided")
Jeromy's avatar
Jeromy committed
215
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216 217 218

	// let's first check if we have the value locally.
	dskey := ds.NewKey(pmes.GetKey())
Jeromy's avatar
Jeromy committed
219
	iVal, err := dht.datastore.Get(dskey)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220 221 222 223 224 225 226

	// if we got an unexpected error, bail.
	if err != ds.ErrNotFound {
		return nil, err
	}

	// if we have the value, respond with it!
Jeromy's avatar
Jeromy committed
227
	if err == nil {
Jeromy's avatar
Jeromy committed
228
		u.DOut("handleGetValue success!\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229 230 231 232

		byts, ok := iVal.([]byte)
		if !ok {
			return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
233
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234 235 236 237 238 239 240 241 242

		resp.Value = byts
		return resp, nil
	}

	// if we know any providers for the requested value, return those.
	provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
	if len(provs) > 0 {
		u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
243
		resp.ProviderPeers = peersToPBPeers(provs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
244 245 246 247
		return resp, nil
	}

	// Find closest peer on given cluster to desired key and reply with that info
248 249
	closer := dht.betterPeerToQuery(pmes)
	if closer == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
250 251 252
		u.DOut("handleGetValue could not find a closer node than myself.\n")
		resp.CloserPeers = nil
		return resp, nil
253
	}
254

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
255 256
	// we got a closer peer, it seems. return it.
	u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
257
	resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
258
	return resp, nil
259 260
}

Jeromy's avatar
Jeromy committed
261
// Store a value in this peer local storage
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
262
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) {
263 264
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
Jeromy's avatar
Jeromy committed
265 266 267 268 269 270
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
271 272
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
273
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
274
	u.DOut("[%s] Responding to ping from [%s]!\n", dht.self.ID.Pretty(), p.ID.Pretty())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
275
	return &Message{Type: pmes.Type}, nil
276 277
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
278 279
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
	resp := &Message{Type: pmes.Type}
280
	var closest *peer.Peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
281

282 283 284 285 286 287
	// if looking for self... special case where we send it on CloserPeers.
	if peer.ID(pmes.GetKey()).Equal(dht.self.ID) {
		closest = dht.self
	} else {
		closest = dht.betterPeerToQuery(pmes)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
288

Jeromy's avatar
Jeromy committed
289
	if closest == nil {
Jeromy's avatar
Jeromy committed
290
		u.PErr("handleFindPeer: could not find anything.\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
291
		return resp, nil
Jeromy's avatar
Jeromy committed
292 293 294
	}

	if len(closest.Addresses) == 0 {
Jeromy's avatar
Jeromy committed
295
		u.PErr("handleFindPeer: no addresses for connected peer...\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296
		return resp, nil
Jeromy's avatar
Jeromy committed
297 298
	}

299
	u.DOut("handleFindPeer: sending back '%s'\n", closest.ID.Pretty())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
300 301
	resp.CloserPeers = peersToPBPeers([]*peer.Peer{closest})
	return resp, nil
302 303
}

304 305 306 307
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) {
	resp := &Message{
		Type: pmes.Type,
		Key:  pmes.Key,
Jeromy's avatar
Jeromy committed
308 309
	}

310
	// check if we have this value, to add ourselves as provider.
311
	has, err := dht.datastore.Has(ds.NewKey(pmes.GetKey()))
312 313 314
	if err != nil && err != ds.ErrNotFound {
		u.PErr("unexpected datastore error: %v\n", err)
		has = false
315 316
	}

317
	// setup providers
318
	providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
319 320 321
	if has {
		providers = append(providers, dht.self)
	}
Jeromy's avatar
Jeromy committed
322

323 324 325
	// if we've got providers, send thos those.
	if providers != nil && len(providers) > 0 {
		resp.ProviderPeers = peersToPBPeers(providers)
326 327
	}

328 329 330 331 332 333 334
	// Also send closer peers.
	closer := dht.betterPeerToQuery(pmes)
	if closer != nil {
		resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
	}

	return resp, nil
335 336
}

337 338
type providerInfo struct {
	Creation time.Time
339
	Value    *peer.Peer
340 341
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
342
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) {
343
	key := u.Key(pmes.GetKey())
Jeromy's avatar
Jeromy committed
344
	u.DOut("[%s] Adding [%s] as a provider for '%s'\n", dht.self.ID.Pretty(), p.ID.Pretty(), peer.ID(key).Pretty())
345
	dht.providers.AddProvider(key, p)
346 347
}

348
// Halt stops all communications from this peer and shut down
349 350 351
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
352 353
	dht.providers.Halt()
	dht.listener.Halt()
354
}
355

Jeromy's avatar
Jeromy committed
356
// NOTE: not yet finished, low priority
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
357
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) {
358
	seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
359
	listenChan := dht.listener.Listen(pmes.GetId(), len(seq), time.Second*30)
360

361
	for _, ps := range seq {
362
		mes := swarm.NewMessage(ps, pmes)
363
		dht.netChan.Outgoing <- mes
364 365 366
	}

	buf := new(bytes.Buffer)
367 368 369
	di := dht.getDiagInfo()
	buf.Write(di.Marshal())

370 371 372 373 374 375 376 377
	// NOTE: this shouldnt be a hardcoded value
	after := time.After(time.Second * 20)
	count := len(seq)
	for count > 0 {
		select {
		case <-after:
			//Timeout, return what we have
			goto out
378
		case reqResp := <-listenChan:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
379
			pmesOut := new(Message)
380
			err := proto.Unmarshal(reqResp.Data, pmesOut)
381 382 383 384
			if err != nil {
				// It broke? eh, whatever, keep going
				continue
			}
385
			buf.Write(reqResp.Data)
386 387 388 389 390
			count--
		}
	}

out:
391
	resp := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
392
		Type:     Message_DIAGNOSTIC,
393
		ID:       pmes.GetId(),
394
		Value:    buf.Bytes(),
395 396 397 398
		Response: true,
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
399
	dht.netChan.Outgoing <- mes
400
}
401

402 403 404
func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {
	pmes, err := dht.getValueSingle(p, key, timeout, level)
	if err != nil {
405
		return nil, nil, err
406 407 408 409 410 411 412 413 414 415 416 417 418
	}

	if pmes.GetSuccess() {
		if pmes.Value == nil { // We were given provider[s]
			val, err := dht.getFromPeerList(key, timeout, pmes.GetPeers(), level)
			if err != nil {
				return nil, nil, err
			}
			return val, nil, nil
		}

		// Success! We were given the value
		return pmes.GetValue(), nil, nil
419
	}
420

421 422 423 424 425 426 427 428
	// We were given a closer node
	var peers []*peer.Peer
	for _, pb := range pmes.GetPeers() {
		if peer.ID(pb.GetId()).Equal(dht.self.ID) {
			continue
		}
		addr, err := ma.NewMultiaddr(pb.GetAddr())
		if err != nil {
429
			u.PErr("%v\n", err.Error())
430 431
			continue
		}
432

433 434
		np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
		if err != nil {
435
			u.PErr("%v\n", err.Error())
436
			continue
437
		}
438 439

		peers = append(peers, np)
440
	}
441
	return nil, peers, nil
442 443
}

444
// getValueSingle simply performs the get value RPC with the given parameters
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
445
func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*Message, error) {
446
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
447
		Type:  Message_GET_VALUE,
448 449
		Key:   string(key),
		Value: []byte{byte(level)},
450
		ID:    swarm.GenerateMessageID(),
Jeromy's avatar
Jeromy committed
451
	}
452
	responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
Jeromy's avatar
Jeromy committed
453 454

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
455
	t := time.Now()
456
	dht.netChan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
457 458 459 460 461

	// Wait for either the response or a timeout
	timeup := time.After(timeout)
	select {
	case <-timeup:
462
		dht.listener.Unlisten(pmes.ID)
Jeromy's avatar
Jeromy committed
463
		return nil, u.ErrTimeout
464
	case resp, ok := <-responseChan:
Jeromy's avatar
Jeromy committed
465
		if !ok {
Jeromy's avatar
Jeromy committed
466
			u.PErr("response channel closed before timeout, please investigate.\n")
Jeromy's avatar
Jeromy committed
467 468
			return nil, u.ErrTimeout
		}
469 470
		roundtrip := time.Since(t)
		resp.Peer.SetLatency(roundtrip)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
471
		pmesOut := new(Message)
472
		err := proto.Unmarshal(resp.Data, pmesOut)
Jeromy's avatar
Jeromy committed
473 474 475
		if err != nil {
			return nil, err
		}
476
		return pmesOut, nil
Jeromy's avatar
Jeromy committed
477 478 479
	}
}

480 481 482 483 484
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
485
	peerlist []*Message_PBPeer, level int) ([]byte, error) {
486 487 488 489
	for _, pinfo := range peerlist {
		p, _ := dht.Find(peer.ID(pinfo.GetId()))
		if p == nil {
			maddr, err := ma.NewMultiaddr(pinfo.GetAddr())
Jeromy's avatar
Jeromy committed
490
			if err != nil {
491
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
492 493
				continue
			}
494

495
			p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr)
Jeromy's avatar
Jeromy committed
496
			if err != nil {
497
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
498 499 500
				continue
			}
		}
501
		pmes, err := dht.getValueSingle(p, key, timeout, level)
Jeromy's avatar
Jeromy committed
502
		if err != nil {
503
			u.DErr("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
504 505
			continue
		}
506
		dht.providers.AddProvider(key, p)
Jeromy's avatar
Jeromy committed
507

508 509 510 511
		// Make sure it was a successful get
		if pmes.GetSuccess() && pmes.Value != nil {
			return pmes.GetValue(), nil
		}
Jeromy's avatar
Jeromy committed
512 513 514 515
	}
	return nil, u.ErrNotFound
}

516
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
517 518
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
519
	v, err := dht.datastore.Get(ds.NewKey(string(key)))
520 521 522 523 524 525
	if err != nil {
		return nil, err
	}
	return v.([]byte), nil
}

526
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
527 528
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}
529

530
// Update TODO(chas) Document this function
531
func (dht *IpfsDHT) Update(p *peer.Peer) {
532
	for _, route := range dht.routingTables {
533
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
534
		// Only close the connection if no tables refer to this peer
535 536
		if removed != nil {
			found := false
537
			for _, r := range dht.routingTables {
538 539 540 541 542 543
				if r.Find(removed.ID) != nil {
					found = true
					break
				}
			}
			if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
544
				dht.network.CloseConnection(removed)
545 546
			}
		}
547 548
	}
}
Jeromy's avatar
Jeromy committed
549

550
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
551
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
552
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
553 554 555 556 557 558 559
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
560

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
561
func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*Message, error) {
562
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
563
		Type:  Message_FIND_NODE,
564
		Key:   string(id),
565
		ID:    swarm.GenerateMessageID(),
566 567 568 569
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
570
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
571
	t := time.Now()
572
	dht.netChan.Outgoing <- mes
573 574 575
	after := time.After(timeout)
	select {
	case <-after:
576
		dht.listener.Unlisten(pmes.ID)
577 578
		return nil, u.ErrTimeout
	case resp := <-listenChan:
579 580
		roundtrip := time.Since(t)
		resp.Peer.SetLatency(roundtrip)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
581
		pmesOut := new(Message)
582
		err := proto.Unmarshal(resp.Data, pmesOut)
583 584 585 586
		if err != nil {
			return nil, err
		}

587
		return pmesOut, nil
588 589
	}
}
590

591 592
func (dht *IpfsDHT) printTables() {
	for _, route := range dht.routingTables {
593 594 595
		route.Print()
	}
}
Jeromy's avatar
Jeromy committed
596

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
597
func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*Message, error) {
598
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
599
		Type:  Message_GET_PROVIDERS,
Jeromy's avatar
Jeromy committed
600
		Key:   string(key),
601
		ID:    swarm.GenerateMessageID(),
Jeromy's avatar
Jeromy committed
602 603 604 605 606
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

607
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
608
	dht.netChan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
609 610 611
	after := time.After(timeout)
	select {
	case <-after:
612
		dht.listener.Unlisten(pmes.ID)
Jeromy's avatar
Jeromy committed
613 614
		return nil, u.ErrTimeout
	case resp := <-listenChan:
Jeromy's avatar
Jeromy committed
615
		u.DOut("FindProviders: got response.\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
616
		pmesOut := new(Message)
617
		err := proto.Unmarshal(resp.Data, pmesOut)
Jeromy's avatar
Jeromy committed
618 619 620 621
		if err != nil {
			return nil, err
		}

622
		return pmesOut, nil
Jeromy's avatar
Jeromy committed
623 624 625
	}
}

626
// TODO: Could be done async
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
627
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*Message_PBPeer) []*peer.Peer {
628
	var provArr []*peer.Peer
Jeromy's avatar
Jeromy committed
629 630 631 632 633 634
	for _, prov := range peers {
		// Dont add outselves to the list
		if peer.ID(prov.GetId()).Equal(dht.self.ID) {
			continue
		}
		// Dont add someone who is already on the list
635
		p := dht.network.GetPeer(u.Key(prov.GetId()))
Jeromy's avatar
Jeromy committed
636
		if p == nil {
637
			u.DOut("given provider %s was not in our network already.\n", peer.ID(prov.GetId()).Pretty())
Jeromy's avatar
Jeromy committed
638 639
			var err error
			p, err = dht.peerFromInfo(prov)
Jeromy's avatar
Jeromy committed
640
			if err != nil {
641
				u.PErr("error connecting to new peer: %s\n", err)
Jeromy's avatar
Jeromy committed
642 643 644
				continue
			}
		}
645
		dht.providers.AddProvider(key, p)
646
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
647
	}
648
	return provArr
Jeromy's avatar
Jeromy committed
649
}
Jeromy's avatar
Jeromy committed
650

651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
// nearestPeerToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeerToQuery(pmes *Message) *peer.Peer {
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
	closer := cluster.NearestPeer(kb.ConvertKey(key))
	return closer
}

// betterPeerToQuery returns nearestPeerToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
	closer := dht.nearestPeerToQuery(pmes)

	// no node? nil
	if closer == nil {
		return nil
	}

	// == to self? nil
	if closer.ID.Equal(dht.self.ID) {
		u.DOut("Attempted to return self! this shouldnt happen...\n")
		return nil
	}

	// self is closer? nil
	if kb.Closer(dht.self.ID, closer.ID, key) {
		return nil
	}

	// ok seems like a closer node.
	return closer
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
685
func (dht *IpfsDHT) peerFromInfo(pbp *Message_PBPeer) (*peer.Peer, error) {
Jeromy's avatar
Jeromy committed
686 687 688 689 690 691 692
	maddr, err := ma.NewMultiaddr(pbp.GetAddr())
	if err != nil {
		return nil, err
	}

	return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
}
693 694

func (dht *IpfsDHT) loadProvidableKeys() error {
695 696 697 698
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
699 700 701 702 703 704 705 706 707 708 709 710
	for _, k := range kl {
		dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
	}
	return nil
}

// Builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap() {
	id := make([]byte, 16)
	rand.Read(id)
	dht.FindPeer(peer.ID(id), time.Second*10)
}