dht.go 12.6 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"crypto/rand"
5
	"errors"
6
	"fmt"
7 8
	"sync"
	"time"
9

10
	inet "github.com/jbenet/go-ipfs/net"
11
	msg "github.com/jbenet/go-ipfs/net/message"
12
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
13
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
14
	u "github.com/jbenet/go-ipfs/util"
15

16
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
17
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
18
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
19

20
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
21 22
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24 25 26 27
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
28 29
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
30
	routingTables []*kb.RoutingTable
31

32
	// the network interface. service
33
	network inet.Network
34
	sender  inet.Sender
35

Jeromy's avatar
Jeromy committed
36 37 38
	// Local peer (yourself)
	self *peer.Peer

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40 41
	// Other peers
	peerstore peer.Peerstore

Jeromy's avatar
Jeromy committed
42 43
	// Local data
	datastore ds.Datastore
44
	dslock    sync.Mutex
45

46
	providers *ProviderManager
47

48 49
	// Signal to shutdown dht
	shutdown chan struct{}
50 51 52

	// When this peer started up
	birth time.Time
53 54 55

	//lock to make diagnostics work better
	diaglock sync.Mutex
56 57
}

Jeromy's avatar
Jeromy committed
58
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59
func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
60
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
61
	dht.network = net
62
	dht.sender = sender
Jeromy's avatar
Jeromy committed
63
	dht.datastore = dstore
64
	dht.self = p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65
	dht.peerstore = ps
66

67
	dht.providers = NewProviderManager(p.ID)
Jeromy's avatar
Jeromy committed
68
	dht.shutdown = make(chan struct{})
69

70 71 72 73
	dht.routingTables = make([]*kb.RoutingTable, 3)
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
74
	dht.birth = time.Now()
Jeromy's avatar
Jeromy committed
75
	return dht
76 77
}

78
// Start up background goroutines needed by the DHT
79
func (dht *IpfsDHT) Start() {
80
	panic("the service is already started. rmv this method")
81 82
}

83
// Connect to a new peer at the given address, ping and add to the routing table
84
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
85
	maddrstr, _ := addr.String()
86
	u.DOut("Connect to new peer: %s\n", maddrstr)
87 88 89 90 91 92 93 94 95 96 97 98 99

	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
	npeer := &peer.Peer{}
	npeer.AddAddress(addr)
	err := dht.network.DialPeer(npeer)
100
	if err != nil {
101
		return nil, err
102 103
	}

Jeromy's avatar
Jeromy committed
104 105
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
106
	err = dht.Ping(npeer, time.Second*2)
Jeromy's avatar
Jeromy committed
107
	if err != nil {
108
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
109 110
	}

111 112
	dht.Update(npeer)

113
	return npeer, nil
Jeromy's avatar
Jeromy committed
114 115
}

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {

	mData := mes.Data()
	if mData == nil {
		return nil, errors.New("message did not include Data")
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		return nil, errors.New("message did not include a Peer")
	}

	// deserialize msg
	pmes := new(Message)
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
	}

	// update the peer (on valid msgs only)
	dht.Update(mPeer)

	// Print out diagnostic
	u.DOut("[peer: %s]\nGot message type: '%s' [from = %s]\n",
		dht.self.ID.Pretty(),
		Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())

	// get handler for this msg type.
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
		return nil, errors.New("Recieved invalid message type")
	}

	// dispatch handler.
	rpmes, err := handler(mPeer, pmes)
	if err != nil {
		return nil, err
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
	}

	return rmes, nil
}

165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

	rmes, err := dht.sender.SendRequest(ctx, mes)
	if err != nil {
		return nil, err
	}
180 181 182
	if rmes == nil {
		return nil, errors.New("no response to request")
	}
183 184 185 186 187 188 189 190 191 192 193 194

	rtt := time.Since(start)
	rmes.Peer().SetLatency(rtt)

	rpmes := new(Message)
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}

	return rpmes, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, key string, value []byte) error {
	pmes := newMessage(Message_PUT_VALUE, string(key), 0)
	pmes.Value = value

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return err
	}
	return dht.sender.SendMessage(ctx, mes)
}

func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) error {
	pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return err
	}
	return dht.sender.SendMessage(ctx, mes)
}

216 217 218 219
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
	key u.Key, level int) ([]byte, []*peer.Peer, error) {

	pmes, err := dht.getValueSingle(ctx, p, key, level)
220
	if err != nil {
221
		return nil, nil, err
222 223
	}

224
	u.POut("pmes.GetValue() %v\n", pmes.GetValue())
225
	if value := pmes.GetValue(); value != nil {
226
		// Success! We were given the value
227
		u.POut("getValueOrPeers: got value\n")
228
		return value, nil, nil
229
	}
230

231
	// TODO decide on providers. This probably shouldn't be happening.
232 233 234 235 236 237 238 239
	if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
		val, err := dht.getFromPeerList(ctx, key, prv, level)
		if err != nil {
			return nil, nil, err
		}
		u.POut("getValueOrPeers: get from providers\n")
		return val, nil, nil
	}
240 241

	// Perhaps we were given closer peers
242
	var peers []*peer.Peer
243
	for _, pb := range pmes.GetCloserPeers() {
244 245 246
		if peer.ID(pb.GetId()).Equal(dht.self.ID) {
			continue
		}
247

248 249
		addr, err := ma.NewMultiaddr(pb.GetAddr())
		if err != nil {
250
			u.PErr("%v\n", err.Error())
251 252
			continue
		}
253

254 255 256 257 258
		// check if we already have this peer.
		pr, _ := dht.peerstore.Get(peer.ID(pb.GetId()))
		if pr == nil {
			pr = &peer.Peer{ID: peer.ID(pb.GetId())}
			dht.peerstore.Put(pr)
259
		}
260 261
		pr.AddAddress(addr) // idempotent
		peers = append(peers, pr)
262
	}
263 264

	if len(peers) > 0 {
265
		u.POut("getValueOrPeers: peers\n")
266 267 268
		return nil, peers, nil
	}

269 270
	u.POut("getValueOrPeers: u.ErrNotFound\n")
	return nil, nil, u.ErrNotFound
271 272
}

273
// getValueSingle simply performs the get value RPC with the given parameters
274 275 276
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
	key u.Key, level int) (*Message, error) {

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
277
	pmes := newMessage(Message_GET_VALUE, string(key), level)
278
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
279 280
}

281 282 283 284
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
285 286
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
	peerlist []*Message_Peer, level int) ([]byte, error) {
287

288
	for _, pinfo := range peerlist {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
289
		p, err := dht.ensureConnectedToPeer(pinfo)
290 291 292
		if err != nil {
			u.DErr("getFromPeers error: %s\n", err)
			continue
Jeromy's avatar
Jeromy committed
293
		}
294 295

		pmes, err := dht.getValueSingle(ctx, p, key, level)
Jeromy's avatar
Jeromy committed
296
		if err != nil {
297
			u.DErr("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
298 299 300
			continue
		}

301 302 303 304
		if value := pmes.GetValue(); value != nil {
			// Success! We were given the value
			dht.providers.AddProvider(key, p)
			return value, nil
305
		}
Jeromy's avatar
Jeromy committed
306 307 308 309
	}
	return nil, u.ErrNotFound
}

310
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
311 312
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
313
	v, err := dht.datastore.Get(ds.NewKey(string(key)))
314 315 316
	if err != nil {
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
317 318 319 320 321 322

	byt, ok := v.([]byte)
	if !ok {
		return byt, errors.New("value stored in datastore not []byte")
	}
	return byt, nil
323 324
}

325
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
326 327
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}
328

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
329 330
// Update signals to all routingTables to Update their last-seen status
// on the given peer.
331
func (dht *IpfsDHT) Update(p *peer.Peer) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
332
	removedCount := 0
333
	for _, route := range dht.routingTables {
334
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
335
		// Only close the connection if no tables refer to this peer
336
		if removed != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
337
			removedCount++
338
		}
339
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
340 341 342 343 344 345 346 347

	// Only close the connection if no tables refer to this peer
	// if removedCount == len(dht.routingTables) {
	// 	dht.network.ClosePeer(p)
	// }
	// ACTUALLY, no, let's not just close the connection. it may be connected
	// due to other things. it seems that we just need connection timeouts
	// after some deadline of inactivity.
348
}
Jeromy's avatar
Jeromy committed
349

350
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
351
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
352
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
353 354 355 356 357 358 359
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
360

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
361 362 363
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p *peer.Peer, id peer.ID, level int) (*Message, error) {
	pmes := newMessage(Message_FIND_NODE, string(id), level)
	return dht.sendRequest(ctx, p, pmes)
364
}
365

366 367
func (dht *IpfsDHT) printTables() {
	for _, route := range dht.routingTables {
368 369 370
		route.Print()
	}
}
Jeromy's avatar
Jeromy committed
371

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
372 373 374
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*Message, error) {
	pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
375 376
}

377
// TODO: Could be done async
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
378
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer {
379
	var provArr []*peer.Peer
Jeromy's avatar
Jeromy committed
380
	for _, prov := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
381 382 383
		p, err := dht.peerFromInfo(prov)
		if err != nil {
			u.PErr("error getting peer from info: %v\n", err)
Jeromy's avatar
Jeromy committed
384 385
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
386 387 388 389

		// Dont add outselves to the list
		if p.ID.Equal(dht.self.ID) {
			continue
Jeromy's avatar
Jeromy committed
390
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
391 392

		// TODO(jbenet) ensure providers is idempotent
393
		dht.providers.AddProvider(key, p)
394
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
395
	}
396
	return provArr
Jeromy's avatar
Jeromy committed
397
}
Jeromy's avatar
Jeromy committed
398

399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
// nearestPeerToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeerToQuery(pmes *Message) *peer.Peer {
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
	closer := cluster.NearestPeer(kb.ConvertKey(key))
	return closer
}

// betterPeerToQuery returns nearestPeerToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
	closer := dht.nearestPeerToQuery(pmes)

	// no node? nil
	if closer == nil {
		return nil
	}

	// == to self? nil
	if closer.ID.Equal(dht.self.ID) {
		u.DOut("Attempted to return self! this shouldnt happen...\n")
		return nil
	}

	// self is closer? nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
425
	key := u.Key(pmes.GetKey())
426 427 428 429 430 431 432 433
	if kb.Closer(dht.self.ID, closer.ID, key) {
		return nil
	}

	// ok seems like a closer node.
	return closer
}

434 435 436
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {

	id := peer.ID(pbp.GetId())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437 438 439 440 441 442

	// continue if it's ourselves
	if id.Equal(dht.self.ID) {
		return nil, errors.New("found self")
	}

443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
	p, _ := dht.peerstore.Get(id)
	if p == nil {
		p, _ = dht.Find(id)
		if p != nil {
			panic("somehow peer not getting into peerstore")
		}
	}

	if p == nil {
		maddr, err := ma.NewMultiaddr(pbp.GetAddr())
		if err != nil {
			return nil, err
		}

		// create new Peer
		p := &peer.Peer{ID: id}
		p.AddAddress(maddr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
460 461 462 463 464 465 466 467 468
		dht.peerstore.Put(p)
	}
	return p, nil
}

func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (*peer.Peer, error) {
	p, err := dht.peerFromInfo(pbp)
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
469 470
	}

471
	// dial connection
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
472
	err = dht.network.DialPeer(p)
473
	return p, err
Jeromy's avatar
Jeromy committed
474
}
475 476

func (dht *IpfsDHT) loadProvidableKeys() error {
477 478 479 480
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
481 482 483 484 485 486
	for _, k := range kl {
		dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
	}
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
487
// Bootstrap builds up list of peers by requesting random peer IDs
488 489 490 491 492
func (dht *IpfsDHT) Bootstrap() {
	id := make([]byte, 16)
	rand.Read(id)
	dht.FindPeer(peer.ID(id), time.Second*10)
}