dht.go 12.3 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"crypto/rand"
5
	"errors"
6
	"fmt"
7 8
	"sync"
	"time"
9

10
	inet "github.com/jbenet/go-ipfs/net"
11
	msg "github.com/jbenet/go-ipfs/net/message"
12
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
13
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
14
	u "github.com/jbenet/go-ipfs/util"
15

16
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
17
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
18
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
19

20
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
21 22
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24 25 26 27
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
28 29
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
30
	routingTables []*kb.RoutingTable
31

32
	// the network interface. service
33
	network inet.Network
34
	sender  inet.Sender
35

Jeromy's avatar
Jeromy committed
36 37 38
	// Local peer (yourself)
	self *peer.Peer

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40 41
	// Other peers
	peerstore peer.Peerstore

Jeromy's avatar
Jeromy committed
42 43
	// Local data
	datastore ds.Datastore
44
	dslock    sync.Mutex
45

46
	providers *ProviderManager
47

48 49
	// When this peer started up
	birth time.Time
50 51 52

	//lock to make diagnostics work better
	diaglock sync.Mutex
53 54
}

Jeromy's avatar
Jeromy committed
55
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
56
func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
57
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
58
	dht.network = net
59
	dht.sender = sender
Jeromy's avatar
Jeromy committed
60
	dht.datastore = dstore
61
	dht.self = p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
	dht.peerstore = ps
63

64
	dht.providers = NewProviderManager(p.ID)
65

66 67 68 69
	dht.routingTables = make([]*kb.RoutingTable, 3)
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
70
	dht.birth = time.Now()
Jeromy's avatar
Jeromy committed
71
	return dht
72 73
}

74
// Connect to a new peer at the given address, ping and add to the routing table
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75 76
func (dht *IpfsDHT) Connect(npeer *peer.Peer) (*peer.Peer, error) {
	u.DOut("Connect to new peer: %s\n", npeer.ID.Pretty())
77 78 79 80 81 82 83 84 85 86 87

	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
	err := dht.network.DialPeer(npeer)
88
	if err != nil {
89
		return nil, err
90 91
	}

Jeromy's avatar
Jeromy committed
92 93
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
94
	err = dht.Ping(npeer, time.Second*2)
Jeromy's avatar
Jeromy committed
95
	if err != nil {
96
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
97 98
	}

99 100
	dht.Update(npeer)

101
	return npeer, nil
Jeromy's avatar
Jeromy committed
102 103
}

104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {

	mData := mes.Data()
	if mData == nil {
		return nil, errors.New("message did not include Data")
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		return nil, errors.New("message did not include a Peer")
	}

	// deserialize msg
	pmes := new(Message)
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
	}

	// update the peer (on valid msgs only)
	dht.Update(mPeer)

	// Print out diagnostic
	u.DOut("[peer: %s]\nGot message type: '%s' [from = %s]\n",
		dht.self.ID.Pretty(),
		Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())

	// get handler for this msg type.
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
		return nil, errors.New("Recieved invalid message type")
	}

	// dispatch handler.
	rpmes, err := handler(mPeer, pmes)
	if err != nil {
		return nil, err
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
	}

	return rmes, nil
}

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

	rmes, err := dht.sender.SendRequest(ctx, mes)
	if err != nil {
		return nil, err
	}
168 169 170
	if rmes == nil {
		return nil, errors.New("no response to request")
	}
171 172 173 174 175 176 177 178 179 180 181 182

	rtt := time.Since(start)
	rmes.Peer().SetLatency(rtt)

	rpmes := new(Message)
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}

	return rpmes, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, key string, value []byte) error {
	pmes := newMessage(Message_PUT_VALUE, string(key), 0)
	pmes.Value = value

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return err
	}
	return dht.sender.SendMessage(ctx, mes)
}

func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) error {
	pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return err
	}
	return dht.sender.SendMessage(ctx, mes)
}

204 205 206 207
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
	key u.Key, level int) ([]byte, []*peer.Peer, error) {

	pmes, err := dht.getValueSingle(ctx, p, key, level)
208
	if err != nil {
209
		return nil, nil, err
210 211
	}

212
	u.POut("pmes.GetValue() %v\n", pmes.GetValue())
213
	if value := pmes.GetValue(); value != nil {
214
		// Success! We were given the value
215
		u.POut("getValueOrPeers: got value\n")
216
		return value, nil, nil
217
	}
218

219
	// TODO decide on providers. This probably shouldn't be happening.
220 221 222 223 224 225 226 227
	if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
		val, err := dht.getFromPeerList(ctx, key, prv, level)
		if err != nil {
			return nil, nil, err
		}
		u.POut("getValueOrPeers: get from providers\n")
		return val, nil, nil
	}
228 229

	// Perhaps we were given closer peers
230
	var peers []*peer.Peer
231
	for _, pb := range pmes.GetCloserPeers() {
232 233 234
		if peer.ID(pb.GetId()).Equal(dht.self.ID) {
			continue
		}
235

236 237
		addr, err := ma.NewMultiaddr(pb.GetAddr())
		if err != nil {
238
			u.PErr("%v\n", err.Error())
239 240
			continue
		}
241

242 243 244 245 246
		// check if we already have this peer.
		pr, _ := dht.peerstore.Get(peer.ID(pb.GetId()))
		if pr == nil {
			pr = &peer.Peer{ID: peer.ID(pb.GetId())}
			dht.peerstore.Put(pr)
247
		}
248 249
		pr.AddAddress(addr) // idempotent
		peers = append(peers, pr)
250
	}
251 252

	if len(peers) > 0 {
253
		u.POut("getValueOrPeers: peers\n")
254 255 256
		return nil, peers, nil
	}

257 258
	u.POut("getValueOrPeers: u.ErrNotFound\n")
	return nil, nil, u.ErrNotFound
259 260
}

261
// getValueSingle simply performs the get value RPC with the given parameters
262 263 264
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
	key u.Key, level int) (*Message, error) {

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
265
	pmes := newMessage(Message_GET_VALUE, string(key), level)
266
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
267 268
}

269 270 271 272
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
273 274
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
	peerlist []*Message_Peer, level int) ([]byte, error) {
275

276
	for _, pinfo := range peerlist {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
277
		p, err := dht.ensureConnectedToPeer(pinfo)
278 279 280
		if err != nil {
			u.DErr("getFromPeers error: %s\n", err)
			continue
Jeromy's avatar
Jeromy committed
281
		}
282 283

		pmes, err := dht.getValueSingle(ctx, p, key, level)
Jeromy's avatar
Jeromy committed
284
		if err != nil {
285
			u.DErr("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
286 287 288
			continue
		}

289 290 291 292
		if value := pmes.GetValue(); value != nil {
			// Success! We were given the value
			dht.providers.AddProvider(key, p)
			return value, nil
293
		}
Jeromy's avatar
Jeromy committed
294 295 296 297
	}
	return nil, u.ErrNotFound
}

298
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
299 300
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
301
	v, err := dht.datastore.Get(ds.NewKey(string(key)))
302 303 304
	if err != nil {
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
305 306 307 308 309 310

	byt, ok := v.([]byte)
	if !ok {
		return byt, errors.New("value stored in datastore not []byte")
	}
	return byt, nil
311 312
}

313
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
314 315
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}
316

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
317 318
// Update signals to all routingTables to Update their last-seen status
// on the given peer.
319
func (dht *IpfsDHT) Update(p *peer.Peer) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
320
	removedCount := 0
321
	for _, route := range dht.routingTables {
322
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
323
		// Only close the connection if no tables refer to this peer
324
		if removed != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
325
			removedCount++
326
		}
327
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
328 329 330 331 332 333 334 335

	// Only close the connection if no tables refer to this peer
	// if removedCount == len(dht.routingTables) {
	// 	dht.network.ClosePeer(p)
	// }
	// ACTUALLY, no, let's not just close the connection. it may be connected
	// due to other things. it seems that we just need connection timeouts
	// after some deadline of inactivity.
336
}
Jeromy's avatar
Jeromy committed
337

338
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
339
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
340
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
341 342 343 344 345 346 347
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
348

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
349 350 351
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p *peer.Peer, id peer.ID, level int) (*Message, error) {
	pmes := newMessage(Message_FIND_NODE, string(id), level)
	return dht.sendRequest(ctx, p, pmes)
352
}
353

354 355
func (dht *IpfsDHT) printTables() {
	for _, route := range dht.routingTables {
356 357 358
		route.Print()
	}
}
Jeromy's avatar
Jeromy committed
359

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
360 361 362
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*Message, error) {
	pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
363 364
}

365
// TODO: Could be done async
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
366
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer {
367
	var provArr []*peer.Peer
Jeromy's avatar
Jeromy committed
368
	for _, prov := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
369 370 371
		p, err := dht.peerFromInfo(prov)
		if err != nil {
			u.PErr("error getting peer from info: %v\n", err)
Jeromy's avatar
Jeromy committed
372 373
			continue
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
374 375 376 377

		// Dont add outselves to the list
		if p.ID.Equal(dht.self.ID) {
			continue
Jeromy's avatar
Jeromy committed
378
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
379 380

		// TODO(jbenet) ensure providers is idempotent
381
		dht.providers.AddProvider(key, p)
382
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
383
	}
384
	return provArr
Jeromy's avatar
Jeromy committed
385
}
Jeromy's avatar
Jeromy committed
386

387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
// nearestPeerToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeerToQuery(pmes *Message) *peer.Peer {
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
	closer := cluster.NearestPeer(kb.ConvertKey(key))
	return closer
}

// betterPeerToQuery returns nearestPeerToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
	closer := dht.nearestPeerToQuery(pmes)

	// no node? nil
	if closer == nil {
		return nil
	}

	// == to self? nil
	if closer.ID.Equal(dht.self.ID) {
		u.DOut("Attempted to return self! this shouldnt happen...\n")
		return nil
	}

	// self is closer? nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
413
	key := u.Key(pmes.GetKey())
414 415 416 417 418 419 420 421
	if kb.Closer(dht.self.ID, closer.ID, key) {
		return nil
	}

	// ok seems like a closer node.
	return closer
}

422 423 424
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {

	id := peer.ID(pbp.GetId())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
425 426 427 428 429 430

	// continue if it's ourselves
	if id.Equal(dht.self.ID) {
		return nil, errors.New("found self")
	}

431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
	p, _ := dht.peerstore.Get(id)
	if p == nil {
		p, _ = dht.Find(id)
		if p != nil {
			panic("somehow peer not getting into peerstore")
		}
	}

	if p == nil {
		maddr, err := ma.NewMultiaddr(pbp.GetAddr())
		if err != nil {
			return nil, err
		}

		// create new Peer
		p := &peer.Peer{ID: id}
		p.AddAddress(maddr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
448 449 450 451 452 453 454 455 456
		dht.peerstore.Put(p)
	}
	return p, nil
}

func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (*peer.Peer, error) {
	p, err := dht.peerFromInfo(pbp)
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
457 458
	}

459
	// dial connection
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
460
	err = dht.network.DialPeer(p)
461
	return p, err
Jeromy's avatar
Jeromy committed
462
}
463 464

func (dht *IpfsDHT) loadProvidableKeys() error {
465 466 467 468
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
469 470 471 472 473 474
	for _, k := range kl {
		dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
	}
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
475
// Bootstrap builds up list of peers by requesting random peer IDs
476 477 478 479 480
func (dht *IpfsDHT) Bootstrap() {
	id := make([]byte, 16)
	rand.Read(id)
	dht.FindPeer(peer.ID(id), time.Second*10)
}