dht.go 12.5 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"crypto/rand"
5
	"errors"
6
	"fmt"
7 8
	"sync"
	"time"
9

10
	inet "github.com/jbenet/go-ipfs/net"
11
	msg "github.com/jbenet/go-ipfs/net/message"
12
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
13
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
14
	u "github.com/jbenet/go-ipfs/util"
15

16
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
17
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
18
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
19

20
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
21 22
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24 25 26 27
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
28 29
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
30
	routingTables []*kb.RoutingTable
31

32
	// the network interface. service
33
	network inet.Network
34
	sender  inet.Sender
35

Jeromy's avatar
Jeromy committed
36 37 38
	// Local peer (yourself)
	self *peer.Peer

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40 41
	// Other peers
	peerstore peer.Peerstore

Jeromy's avatar
Jeromy committed
42 43
	// Local data
	datastore ds.Datastore
44
	dslock    sync.Mutex
45

46
	providers *ProviderManager
47

48 49
	// Signal to shutdown dht
	shutdown chan struct{}
50 51 52

	// When this peer started up
	birth time.Time
53 54 55

	//lock to make diagnostics work better
	diaglock sync.Mutex
56 57
}

Jeromy's avatar
Jeromy committed
58
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59
func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
60
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
61
	dht.network = net
62
	dht.sender = sender
Jeromy's avatar
Jeromy committed
63
	dht.datastore = dstore
64
	dht.self = p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65
	dht.peerstore = ps
66

67
	dht.providers = NewProviderManager(p.ID)
Jeromy's avatar
Jeromy committed
68
	dht.shutdown = make(chan struct{})
69

70 71 72 73
	dht.routingTables = make([]*kb.RoutingTable, 3)
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
74
	dht.birth = time.Now()
Jeromy's avatar
Jeromy committed
75
	return dht
76 77
}

78
// Start up background goroutines needed by the DHT
79
func (dht *IpfsDHT) Start() {
80
	panic("the service is already started. rmv this method")
81 82
}

83
// Connect to a new peer at the given address, ping and add to the routing table
84
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
85
	maddrstr, _ := addr.String()
86
	u.DOut("Connect to new peer: %s\n", maddrstr)
87 88 89 90 91 92 93 94 95 96 97 98 99

	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
	npeer := &peer.Peer{}
	npeer.AddAddress(addr)
	err := dht.network.DialPeer(npeer)
100
	if err != nil {
101
		return nil, err
102 103
	}

Jeromy's avatar
Jeromy committed
104 105
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
106
	err = dht.Ping(npeer, time.Second*2)
Jeromy's avatar
Jeromy committed
107
	if err != nil {
108
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
109 110
	}

111 112
	dht.Update(npeer)

113
	return npeer, nil
Jeromy's avatar
Jeromy committed
114 115
}

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {

	mData := mes.Data()
	if mData == nil {
		return nil, errors.New("message did not include Data")
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		return nil, errors.New("message did not include a Peer")
	}

	// deserialize msg
	pmes := new(Message)
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
	}

	// update the peer (on valid msgs only)
	dht.Update(mPeer)

	// Print out diagnostic
	u.DOut("[peer: %s]\nGot message type: '%s' [from = %s]\n",
		dht.self.ID.Pretty(),
		Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())

	// get handler for this msg type.
	var resp *Message
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
		return nil, errors.New("Recieved invalid message type")
	}

	// dispatch handler.
	rpmes, err := handler(mPeer, pmes)
	if err != nil {
		return nil, err
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
	}

	return rmes, nil
}

166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

	rmes, err := dht.sender.SendRequest(ctx, mes)
	if err != nil {
		return nil, err
	}

	rtt := time.Since(start)
	rmes.Peer().SetLatency(rtt)

	rpmes := new(Message)
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}

	return rpmes, nil
}

func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
	key u.Key, level int) ([]byte, []*peer.Peer, error) {

	pmes, err := dht.getValueSingle(ctx, p, key, level)
197
	if err != nil {
198
		return nil, nil, err
199 200
	}

201
	if value := pmes.GetValue(); value != nil {
202
		// Success! We were given the value
203
		return value, nil, nil
204
	}
205

206 207 208 209 210 211 212 213 214 215
	// TODO decide on providers. This probably shouldn't be happening.
	// if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
	// 	val, err := dht.getFromPeerList(key, timeout,, level)
	// 	if err != nil {
	// 		return nil, nil, err
	// 	}
	// 	return val, nil, nil
	// }

	// Perhaps we were given closer peers
216
	var peers []*peer.Peer
217
	for _, pb := range pmes.GetCloserPeers() {
218 219 220
		if peer.ID(pb.GetId()).Equal(dht.self.ID) {
			continue
		}
221

222 223
		addr, err := ma.NewMultiaddr(pb.GetAddr())
		if err != nil {
224
			u.PErr("%v\n", err.Error())
225 226
			continue
		}
227

228 229
		np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
		if err != nil {
230
			u.PErr("%v\n", err.Error())
231
			continue
232
		}
233 234

		peers = append(peers, np)
235
	}
236 237 238 239 240 241

	if len(peers) > 0 {
		return nil, peers, nil
	}

	return nil, nil, errors.New("NotFound. did not get value or closer peers.")
242 243
}

244
// getValueSingle simply performs the get value RPC with the given parameters
245 246 247 248 249 250 251 252 253
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
	key u.Key, level int) (*Message, error) {

	typ := Message_GET_VALUE
	skey := string(key)
	pmes := &Message{Type: &typ, Key: &skey}
	pmes.SetClusterLevel(int32(level))

	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
254 255
}

256 257 258 259 260
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
261
	peerlist []*Message_PBPeer, level int) ([]byte, error) {
262 263 264 265
	for _, pinfo := range peerlist {
		p, _ := dht.Find(peer.ID(pinfo.GetId()))
		if p == nil {
			maddr, err := ma.NewMultiaddr(pinfo.GetAddr())
Jeromy's avatar
Jeromy committed
266
			if err != nil {
267
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
268 269
				continue
			}
270

271
			p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr)
Jeromy's avatar
Jeromy committed
272
			if err != nil {
273
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
274 275 276
				continue
			}
		}
277
		pmes, err := dht.getValueSingle(p, key, timeout, level)
Jeromy's avatar
Jeromy committed
278
		if err != nil {
279
			u.DErr("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
280 281
			continue
		}
282
		dht.providers.AddProvider(key, p)
Jeromy's avatar
Jeromy committed
283

284 285 286 287
		// Make sure it was a successful get
		if pmes.GetSuccess() && pmes.Value != nil {
			return pmes.GetValue(), nil
		}
Jeromy's avatar
Jeromy committed
288 289 290 291
	}
	return nil, u.ErrNotFound
}

292
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
293 294
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
295
	v, err := dht.datastore.Get(ds.NewKey(string(key)))
296 297 298
	if err != nil {
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
299 300 301 302 303 304

	byt, ok := v.([]byte)
	if !ok {
		return byt, errors.New("value stored in datastore not []byte")
	}
	return byt, nil
305 306
}

307
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
308 309
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}
310

311
// Update TODO(chas) Document this function
312
func (dht *IpfsDHT) Update(p *peer.Peer) {
313
	for _, route := range dht.routingTables {
314
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
315
		// Only close the connection if no tables refer to this peer
316 317
		if removed != nil {
			found := false
318
			for _, r := range dht.routingTables {
319 320 321 322 323 324
				if r.Find(removed.ID) != nil {
					found = true
					break
				}
			}
			if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
325
				dht.network.CloseConnection(removed)
326 327
			}
		}
328 329
	}
}
Jeromy's avatar
Jeromy committed
330

331
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
332
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
333
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
334 335 336 337 338 339 340
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
341

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
342
func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*Message, error) {
343
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
344
		Type:  Message_FIND_NODE,
345
		Key:   string(id),
346
		ID:    swarm.GenerateMessageID(),
347 348 349 350
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
351
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
352
	t := time.Now()
353
	dht.netChan.Outgoing <- mes
354 355 356
	after := time.After(timeout)
	select {
	case <-after:
357
		dht.listener.Unlisten(pmes.ID)
358 359
		return nil, u.ErrTimeout
	case resp := <-listenChan:
360 361
		roundtrip := time.Since(t)
		resp.Peer.SetLatency(roundtrip)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
362
		pmesOut := new(Message)
363
		err := proto.Unmarshal(resp.Data, pmesOut)
364 365 366 367
		if err != nil {
			return nil, err
		}

368
		return pmesOut, nil
369 370
	}
}
371

372 373
func (dht *IpfsDHT) printTables() {
	for _, route := range dht.routingTables {
374 375 376
		route.Print()
	}
}
Jeromy's avatar
Jeromy committed
377

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
378
func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*Message, error) {
379
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
380
		Type:  Message_GET_PROVIDERS,
Jeromy's avatar
Jeromy committed
381
		Key:   string(key),
382
		ID:    swarm.GenerateMessageID(),
Jeromy's avatar
Jeromy committed
383 384 385 386 387
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

388
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
389
	dht.netChan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
390 391 392
	after := time.After(timeout)
	select {
	case <-after:
393
		dht.listener.Unlisten(pmes.ID)
Jeromy's avatar
Jeromy committed
394 395
		return nil, u.ErrTimeout
	case resp := <-listenChan:
Jeromy's avatar
Jeromy committed
396
		u.DOut("FindProviders: got response.\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
397
		pmesOut := new(Message)
398
		err := proto.Unmarshal(resp.Data, pmesOut)
Jeromy's avatar
Jeromy committed
399 400 401 402
		if err != nil {
			return nil, err
		}

403
		return pmesOut, nil
Jeromy's avatar
Jeromy committed
404 405 406
	}
}

407
// TODO: Could be done async
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
408
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*Message_PBPeer) []*peer.Peer {
409
	var provArr []*peer.Peer
Jeromy's avatar
Jeromy committed
410 411 412 413 414 415
	for _, prov := range peers {
		// Dont add outselves to the list
		if peer.ID(prov.GetId()).Equal(dht.self.ID) {
			continue
		}
		// Dont add someone who is already on the list
416
		p := dht.network.GetPeer(u.Key(prov.GetId()))
Jeromy's avatar
Jeromy committed
417
		if p == nil {
418
			u.DOut("given provider %s was not in our network already.\n", peer.ID(prov.GetId()).Pretty())
Jeromy's avatar
Jeromy committed
419 420
			var err error
			p, err = dht.peerFromInfo(prov)
Jeromy's avatar
Jeromy committed
421
			if err != nil {
422
				u.PErr("error connecting to new peer: %s\n", err)
Jeromy's avatar
Jeromy committed
423 424 425
				continue
			}
		}
426
		dht.providers.AddProvider(key, p)
427
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
428
	}
429
	return provArr
Jeromy's avatar
Jeromy committed
430
}
Jeromy's avatar
Jeromy committed
431

432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
// nearestPeerToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeerToQuery(pmes *Message) *peer.Peer {
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
	closer := cluster.NearestPeer(kb.ConvertKey(key))
	return closer
}

// betterPeerToQuery returns nearestPeerToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
	closer := dht.nearestPeerToQuery(pmes)

	// no node? nil
	if closer == nil {
		return nil
	}

	// == to self? nil
	if closer.ID.Equal(dht.self.ID) {
		u.DOut("Attempted to return self! this shouldnt happen...\n")
		return nil
	}

	// self is closer? nil
	if kb.Closer(dht.self.ID, closer.ID, key) {
		return nil
	}

	// ok seems like a closer node.
	return closer
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
466
func (dht *IpfsDHT) peerFromInfo(pbp *Message_PBPeer) (*peer.Peer, error) {
Jeromy's avatar
Jeromy committed
467 468 469 470 471 472 473
	maddr, err := ma.NewMultiaddr(pbp.GetAddr())
	if err != nil {
		return nil, err
	}

	return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
}
474 475

func (dht *IpfsDHT) loadProvidableKeys() error {
476 477 478 479
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
480 481 482 483 484 485 486 487 488 489 490 491
	for _, k := range kl {
		dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
	}
	return nil
}

// Builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap() {
	id := make([]byte, 16)
	rand.Read(id)
	dht.FindPeer(peer.ID(id), time.Second*10)
}