dht.go 12.4 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"crypto/rand"
5
	"errors"
6
	"fmt"
7 8
	"sync"
	"time"
9

10
	inet "github.com/jbenet/go-ipfs/net"
11
	msg "github.com/jbenet/go-ipfs/net/message"
12
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
13
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
14
	u "github.com/jbenet/go-ipfs/util"
15

16
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
17
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
18
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
19

20
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
21 22
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24 25 26 27
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
28 29
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
30
	routingTables []*kb.RoutingTable
31

32
	// the network interface. service
33
	network inet.Network
34
	sender  inet.Sender
35

Jeromy's avatar
Jeromy committed
36 37 38
	// Local peer (yourself)
	self *peer.Peer

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40 41
	// Other peers
	peerstore peer.Peerstore

Jeromy's avatar
Jeromy committed
42 43
	// Local data
	datastore ds.Datastore
44
	dslock    sync.Mutex
45

46
	providers *ProviderManager
47

48 49
	// Signal to shutdown dht
	shutdown chan struct{}
50 51 52

	// When this peer started up
	birth time.Time
53 54 55

	//lock to make diagnostics work better
	diaglock sync.Mutex
56 57
}

Jeromy's avatar
Jeromy committed
58
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59
func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
60
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
61
	dht.network = net
62
	dht.sender = sender
Jeromy's avatar
Jeromy committed
63
	dht.datastore = dstore
64
	dht.self = p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65
	dht.peerstore = ps
66

67
	dht.providers = NewProviderManager(p.ID)
Jeromy's avatar
Jeromy committed
68
	dht.shutdown = make(chan struct{})
69

70 71 72 73
	dht.routingTables = make([]*kb.RoutingTable, 3)
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
74
	dht.birth = time.Now()
Jeromy's avatar
Jeromy committed
75
	return dht
76 77
}

78
// Start up background goroutines needed by the DHT
79
func (dht *IpfsDHT) Start() {
80
	panic("the service is already started. rmv this method")
81 82
}

83
// Connect to a new peer at the given address, ping and add to the routing table
84
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
85
	maddrstr, _ := addr.String()
86
	u.DOut("Connect to new peer: %s\n", maddrstr)
87 88 89 90 91 92 93 94 95 96 97 98 99

	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
	npeer := &peer.Peer{}
	npeer.AddAddress(addr)
	err := dht.network.DialPeer(npeer)
100
	if err != nil {
101
		return nil, err
102 103
	}

Jeromy's avatar
Jeromy committed
104 105
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
106
	err = dht.Ping(npeer, time.Second*2)
Jeromy's avatar
Jeromy committed
107
	if err != nil {
108
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
109 110
	}

111 112
	dht.Update(npeer)

113
	return npeer, nil
Jeromy's avatar
Jeromy committed
114 115
}

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {

	mData := mes.Data()
	if mData == nil {
		return nil, errors.New("message did not include Data")
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		return nil, errors.New("message did not include a Peer")
	}

	// deserialize msg
	pmes := new(Message)
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
	}

	// update the peer (on valid msgs only)
	dht.Update(mPeer)

	// Print out diagnostic
	u.DOut("[peer: %s]\nGot message type: '%s' [from = %s]\n",
		dht.self.ID.Pretty(),
		Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())

	// get handler for this msg type.
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
		return nil, errors.New("Recieved invalid message type")
	}

	// dispatch handler.
	rpmes, err := handler(mPeer, pmes)
	if err != nil {
		return nil, err
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
	}

	return rmes, nil
}

165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

	rmes, err := dht.sender.SendRequest(ctx, mes)
	if err != nil {
		return nil, err
	}

	rtt := time.Since(start)
	rmes.Peer().SetLatency(rtt)

	rpmes := new(Message)
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}

	return rpmes, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, key string, value []byte) error {
	pmes := newMessage(Message_PUT_VALUE, string(key), 0)
	pmes.Value = value

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return err
	}
	return dht.sender.SendMessage(ctx, mes)
}

func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) error {
	pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return err
	}
	return dht.sender.SendMessage(ctx, mes)
}

213 214 215 216
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
	key u.Key, level int) ([]byte, []*peer.Peer, error) {

	pmes, err := dht.getValueSingle(ctx, p, key, level)
217
	if err != nil {
218
		return nil, nil, err
219 220
	}

221
	if value := pmes.GetValue(); value != nil {
222
		// Success! We were given the value
223
		return value, nil, nil
224
	}
225

226 227 228 229 230 231 232 233 234 235
	// TODO decide on providers. This probably shouldn't be happening.
	// if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
	// 	val, err := dht.getFromPeerList(key, timeout,, level)
	// 	if err != nil {
	// 		return nil, nil, err
	// 	}
	// 	return val, nil, nil
	// }

	// Perhaps we were given closer peers
236
	var peers []*peer.Peer
237
	for _, pb := range pmes.GetCloserPeers() {
238 239 240
		if peer.ID(pb.GetId()).Equal(dht.self.ID) {
			continue
		}
241

242 243
		addr, err := ma.NewMultiaddr(pb.GetAddr())
		if err != nil {
244
			u.PErr("%v\n", err.Error())
245 246
			continue
		}
247

248 249 250 251 252
		// check if we already have this peer.
		pr, _ := dht.peerstore.Get(peer.ID(pb.GetId()))
		if pr == nil {
			pr = &peer.Peer{ID: peer.ID(pb.GetId())}
			dht.peerstore.Put(pr)
253
		}
254 255
		pr.AddAddress(addr) // idempotent
		peers = append(peers, pr)
256
	}
257 258 259 260 261 262

	if len(peers) > 0 {
		return nil, peers, nil
	}

	return nil, nil, errors.New("NotFound. did not get value or closer peers.")
263 264
}

265
// getValueSingle simply performs the get value RPC with the given parameters
266 267 268
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
	key u.Key, level int) (*Message, error) {

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
269
	pmes := newMessage(Message_GET_VALUE, string(key), level)
270
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
271 272
}

273 274 275 276
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
277 278
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
	peerlist []*Message_Peer, level int) ([]byte, error) {
279

280
	for _, pinfo := range peerlist {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
281
		p, err := dht.ensureConnectedToPeer(pinfo)
282 283 284
		if err != nil {
			u.DErr("getFromPeers error: %s\n", err)
			continue
Jeromy's avatar
Jeromy committed
285
		}
286 287

		pmes, err := dht.getValueSingle(ctx, p, key, level)
Jeromy's avatar
Jeromy committed
288
		if err != nil {
289
			u.DErr("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
290 291 292
			continue
		}

293 294 295 296
		if value := pmes.GetValue(); value != nil {
			// Success! We were given the value
			dht.providers.AddProvider(key, p)
			return value, nil
297
		}
Jeromy's avatar
Jeromy committed
298 299 300 301
	}
	return nil, u.ErrNotFound
}

302
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
303 304
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
305
	v, err := dht.datastore.Get(ds.NewKey(string(key)))
306 307 308
	if err != nil {
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
309 310 311 312 313 314

	byt, ok := v.([]byte)
	if !ok {
		return byt, errors.New("value stored in datastore not []byte")
	}
	return byt, nil
315 316
}

317
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
318 319
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}
320

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
321 322
// Update signals to all routingTables to Update their last-seen status
// on the given peer.
323
func (dht *IpfsDHT) Update(p *peer.Peer) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
324
	removedCount := 0
325
	for _, route := range dht.routingTables {
326
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
327
		// Only close the connection if no tables refer to this peer
328
		if removed != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
329
			removedCount++
330
		}
331
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
332 333 334 335 336 337 338 339

	// Only close the connection if no tables refer to this peer
	// if removedCount == len(dht.routingTables) {
	// 	dht.network.ClosePeer(p)
	// }
	// ACTUALLY, no, let's not just close the connection. it may be connected
	// due to other things. it seems that we just need connection timeouts
	// after some deadline of inactivity.
340
}
Jeromy's avatar
Jeromy committed
341

342
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
343
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
344
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
345 346 347 348 349 350 351
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
352

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
353 354 355
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p *peer.Peer, id peer.ID, level int) (*Message, error) {
	pmes := newMessage(Message_FIND_NODE, string(id), level)
	return dht.sendRequest(ctx, p, pmes)
356
}
357

358 359
func (dht *IpfsDHT) printTables() {
	for _, route := range dht.routingTables {
360 361 362
		route.Print()
	}
}
Jeromy's avatar
Jeromy committed
363

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
364 365 366
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*Message, error) {
	pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
367 368
}

369
// TODO: Could be done async
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
370
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer {
371
	var provArr []*peer.Peer
Jeromy's avatar
Jeromy committed
372
	for _, prov := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
373 374 375
		p, err := dht.peerFromInfo(prov)
		if err != nil {
			u.PErr("error getting peer from info: %v\n", err)
Jeromy's avatar
Jeromy committed
376 377
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
378 379 380 381

		// Dont add outselves to the list
		if p.ID.Equal(dht.self.ID) {
			continue
Jeromy's avatar
Jeromy committed
382
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
383 384

		// TODO(jbenet) ensure providers is idempotent
385
		dht.providers.AddProvider(key, p)
386
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
387
	}
388
	return provArr
Jeromy's avatar
Jeromy committed
389
}
Jeromy's avatar
Jeromy committed
390

391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
// nearestPeerToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeerToQuery(pmes *Message) *peer.Peer {
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
	closer := cluster.NearestPeer(kb.ConvertKey(key))
	return closer
}

// betterPeerToQuery returns nearestPeerToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
	closer := dht.nearestPeerToQuery(pmes)

	// no node? nil
	if closer == nil {
		return nil
	}

	// == to self? nil
	if closer.ID.Equal(dht.self.ID) {
		u.DOut("Attempted to return self! this shouldnt happen...\n")
		return nil
	}

	// self is closer? nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
417
	key := u.Key(pmes.GetKey())
418 419 420 421 422 423 424 425
	if kb.Closer(dht.self.ID, closer.ID, key) {
		return nil
	}

	// ok seems like a closer node.
	return closer
}

426 427 428
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {

	id := peer.ID(pbp.GetId())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
429 430 431 432 433 434

	// continue if it's ourselves
	if id.Equal(dht.self.ID) {
		return nil, errors.New("found self")
	}

435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
	p, _ := dht.peerstore.Get(id)
	if p == nil {
		p, _ = dht.Find(id)
		if p != nil {
			panic("somehow peer not getting into peerstore")
		}
	}

	if p == nil {
		maddr, err := ma.NewMultiaddr(pbp.GetAddr())
		if err != nil {
			return nil, err
		}

		// create new Peer
		p := &peer.Peer{ID: id}
		p.AddAddress(maddr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
452 453 454 455 456 457 458 459 460
		dht.peerstore.Put(p)
	}
	return p, nil
}

func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (*peer.Peer, error) {
	p, err := dht.peerFromInfo(pbp)
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
461 462
	}

463
	// dial connection
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
464
	err = dht.network.DialPeer(p)
465
	return p, err
Jeromy's avatar
Jeromy committed
466
}
467 468

func (dht *IpfsDHT) loadProvidableKeys() error {
469 470 471 472
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
473 474 475 476 477 478
	for _, k := range kl {
		dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
	}
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
479
// Bootstrap builds up list of peers by requesting random peer IDs
480 481 482 483 484
func (dht *IpfsDHT) Bootstrap() {
	id := make([]byte, 16)
	rand.Read(id)
	dht.FindPeer(peer.ID(id), time.Second*10)
}