dht.go 12.4 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"crypto/rand"
5
	"errors"
6
	"fmt"
7 8
	"sync"
	"time"
9

10
	inet "github.com/jbenet/go-ipfs/net"
11
	msg "github.com/jbenet/go-ipfs/net/message"
12
	peer "github.com/jbenet/go-ipfs/peer"
Jeromy's avatar
Jeromy committed
13
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
14
	u "github.com/jbenet/go-ipfs/util"
15

16
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
17
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
18
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
19

20
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
21 22
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24 25 26 27
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
28 29
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
30
	routingTables []*kb.RoutingTable
31

32
	// the network interface. service
33
	network inet.Network
34
	sender  inet.Sender
35

Jeromy's avatar
Jeromy committed
36 37 38 39 40
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
41
	dslock    sync.Mutex
42

43
	providers *ProviderManager
44

45 46
	// Signal to shutdown dht
	shutdown chan struct{}
47 48 49

	// When this peer started up
	birth time.Time
50 51 52

	//lock to make diagnostics work better
	diaglock sync.Mutex
53 54
}

Jeromy's avatar
Jeromy committed
55
// NewDHT creates a new DHT object with the given peer as the 'local' host
56
func NewDHT(p *peer.Peer, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
57
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
58
	dht.network = net
59
	dht.sender = sender
Jeromy's avatar
Jeromy committed
60
	dht.datastore = dstore
61
	dht.self = p
62

63
	dht.providers = NewProviderManager(p.ID)
Jeromy's avatar
Jeromy committed
64
	dht.shutdown = make(chan struct{})
65

66 67 68 69
	dht.routingTables = make([]*kb.RoutingTable, 3)
	dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
	dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
	dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
70
	dht.birth = time.Now()
Jeromy's avatar
Jeromy committed
71
	return dht
72 73
}

74
// Start up background goroutines needed by the DHT
75
func (dht *IpfsDHT) Start() {
76
	panic("the service is already started. rmv this method")
77 78
}

79
// Connect to a new peer at the given address, ping and add to the routing table
80
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
81
	maddrstr, _ := addr.String()
82
	u.DOut("Connect to new peer: %s\n", maddrstr)
83 84 85 86 87 88 89 90 91 92 93 94 95

	// TODO(jbenet,whyrusleeping)
	//
	// Connect should take in a Peer (with ID). In a sense, we shouldn't be
	// allowing connections to random multiaddrs without knowing who we're
	// speaking to (i.e. peer.ID). In terms of moving around simple addresses
	// -- instead of an (ID, Addr) pair -- we can use:
	//
	//   /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
	//
	npeer := &peer.Peer{}
	npeer.AddAddress(addr)
	err := dht.network.DialPeer(npeer)
96
	if err != nil {
97
		return nil, err
98 99
	}

Jeromy's avatar
Jeromy committed
100 101
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
102
	err = dht.Ping(npeer, time.Second*2)
Jeromy's avatar
Jeromy committed
103
	if err != nil {
104
		return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
Jeromy's avatar
Jeromy committed
105 106
	}

107 108
	dht.Update(npeer)

109
	return npeer, nil
Jeromy's avatar
Jeromy committed
110 111
}

112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
// HandleMessage implements the inet.Handler interface.
func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.NetMessage, error) {

	mData := mes.Data()
	if mData == nil {
		return nil, errors.New("message did not include Data")
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		return nil, errors.New("message did not include a Peer")
	}

	// deserialize msg
	pmes := new(Message)
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to decode protobuf message: %v\n", err)
	}

	// update the peer (on valid msgs only)
	dht.Update(mPeer)

	// Print out diagnostic
	u.DOut("[peer: %s]\nGot message type: '%s' [from = %s]\n",
		dht.self.ID.Pretty(),
		Message_MessageType_name[int32(pmes.GetType())], mPeer.ID.Pretty())

	// get handler for this msg type.
	var resp *Message
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
		return nil, errors.New("Recieved invalid message type")
	}

	// dispatch handler.
	rpmes, err := handler(mPeer, pmes)
	if err != nil {
		return nil, err
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
		return nil, fmt.Errorf("Failed to encode protobuf message: %v\n", err)
	}

	return rmes, nil
}

162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

	rmes, err := dht.sender.SendRequest(ctx, mes)
	if err != nil {
		return nil, err
	}

	rtt := time.Since(start)
	rmes.Peer().SetLatency(rtt)

	rpmes := new(Message)
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}

	return rpmes, nil
}

func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
	key u.Key, level int) ([]byte, []*peer.Peer, error) {

	pmes, err := dht.getValueSingle(ctx, p, key, level)
193
	if err != nil {
194
		return nil, nil, err
195 196
	}

197
	if value := pmes.GetValue(); value != nil {
198
		// Success! We were given the value
199
		return value, nil, nil
200
	}
201

202 203 204 205 206 207 208 209 210 211
	// TODO decide on providers. This probably shouldn't be happening.
	// if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
	// 	val, err := dht.getFromPeerList(key, timeout,, level)
	// 	if err != nil {
	// 		return nil, nil, err
	// 	}
	// 	return val, nil, nil
	// }

	// Perhaps we were given closer peers
212
	var peers []*peer.Peer
213
	for _, pb := range pmes.GetCloserPeers() {
214 215 216
		if peer.ID(pb.GetId()).Equal(dht.self.ID) {
			continue
		}
217

218 219
		addr, err := ma.NewMultiaddr(pb.GetAddr())
		if err != nil {
220
			u.PErr("%v\n", err.Error())
221 222
			continue
		}
223

224 225
		np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
		if err != nil {
226
			u.PErr("%v\n", err.Error())
227
			continue
228
		}
229 230

		peers = append(peers, np)
231
	}
232 233 234 235 236 237

	if len(peers) > 0 {
		return nil, peers, nil
	}

	return nil, nil, errors.New("NotFound. did not get value or closer peers.")
238 239
}

240
// getValueSingle simply performs the get value RPC with the given parameters
241 242 243 244 245 246 247 248 249
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
	key u.Key, level int) (*Message, error) {

	typ := Message_GET_VALUE
	skey := string(key)
	pmes := &Message{Type: &typ, Key: &skey}
	pmes.SetClusterLevel(int32(level))

	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
250 251
}

252 253 254 255 256
// TODO: Im not certain on this implementation, we get a list of peers/providers
// from someone what do we do with it? Connect to each of them? randomly pick
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
257
	peerlist []*Message_PBPeer, level int) ([]byte, error) {
258 259 260 261
	for _, pinfo := range peerlist {
		p, _ := dht.Find(peer.ID(pinfo.GetId()))
		if p == nil {
			maddr, err := ma.NewMultiaddr(pinfo.GetAddr())
Jeromy's avatar
Jeromy committed
262
			if err != nil {
263
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
264 265
				continue
			}
266

267
			p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr)
Jeromy's avatar
Jeromy committed
268
			if err != nil {
269
				u.PErr("getValue error: %s\n", err)
Jeromy's avatar
Jeromy committed
270 271 272
				continue
			}
		}
273
		pmes, err := dht.getValueSingle(p, key, timeout, level)
Jeromy's avatar
Jeromy committed
274
		if err != nil {
275
			u.DErr("getFromPeers error: %s\n", err)
Jeromy's avatar
Jeromy committed
276 277
			continue
		}
278
		dht.providers.AddProvider(key, p)
Jeromy's avatar
Jeromy committed
279

280 281 282 283
		// Make sure it was a successful get
		if pmes.GetSuccess() && pmes.Value != nil {
			return pmes.GetValue(), nil
		}
Jeromy's avatar
Jeromy committed
284 285 286 287
	}
	return nil, u.ErrNotFound
}

288
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
289 290
	dht.dslock.Lock()
	defer dht.dslock.Unlock()
291
	v, err := dht.datastore.Get(ds.NewKey(string(key)))
292 293 294
	if err != nil {
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
295 296 297 298 299 300

	byt, ok := v.([]byte)
	if !ok {
		return byt, errors.New("value stored in datastore not []byte")
	}
	return byt, nil
301 302
}

303
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
304 305
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}
306

307
// Update TODO(chas) Document this function
308
func (dht *IpfsDHT) Update(p *peer.Peer) {
309
	for _, route := range dht.routingTables {
310
		removed := route.Update(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
311
		// Only close the connection if no tables refer to this peer
312 313
		if removed != nil {
			found := false
314
			for _, r := range dht.routingTables {
315 316 317 318 319 320
				if r.Find(removed.ID) != nil {
					found = true
					break
				}
			}
			if !found {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
321
				dht.network.CloseConnection(removed)
322 323
			}
		}
324 325
	}
}
Jeromy's avatar
Jeromy committed
326

327
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
328
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
329
	for _, table := range dht.routingTables {
Jeromy's avatar
Jeromy committed
330 331 332 333 334 335 336
		p := table.Find(id)
		if p != nil {
			return p, table
		}
	}
	return nil, nil
}
337

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
338
func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*Message, error) {
339
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
340
		Type:  Message_FIND_NODE,
341
		Key:   string(id),
342
		ID:    swarm.GenerateMessageID(),
343 344 345 346
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
347
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
348
	t := time.Now()
349
	dht.netChan.Outgoing <- mes
350 351 352
	after := time.After(timeout)
	select {
	case <-after:
353
		dht.listener.Unlisten(pmes.ID)
354 355
		return nil, u.ErrTimeout
	case resp := <-listenChan:
356 357
		roundtrip := time.Since(t)
		resp.Peer.SetLatency(roundtrip)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
358
		pmesOut := new(Message)
359
		err := proto.Unmarshal(resp.Data, pmesOut)
360 361 362 363
		if err != nil {
			return nil, err
		}

364
		return pmesOut, nil
365 366
	}
}
367

368 369
func (dht *IpfsDHT) printTables() {
	for _, route := range dht.routingTables {
370 371 372
		route.Print()
	}
}
Jeromy's avatar
Jeromy committed
373

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
374
func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*Message, error) {
375
	pmes := Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
376
		Type:  Message_GET_PROVIDERS,
Jeromy's avatar
Jeromy committed
377
		Key:   string(key),
378
		ID:    swarm.GenerateMessageID(),
Jeromy's avatar
Jeromy committed
379 380 381 382 383
		Value: []byte{byte(level)},
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())

384
	listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
385
	dht.netChan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
386 387 388
	after := time.After(timeout)
	select {
	case <-after:
389
		dht.listener.Unlisten(pmes.ID)
Jeromy's avatar
Jeromy committed
390 391
		return nil, u.ErrTimeout
	case resp := <-listenChan:
Jeromy's avatar
Jeromy committed
392
		u.DOut("FindProviders: got response.\n")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
393
		pmesOut := new(Message)
394
		err := proto.Unmarshal(resp.Data, pmesOut)
Jeromy's avatar
Jeromy committed
395 396 397 398
		if err != nil {
			return nil, err
		}

399
		return pmesOut, nil
Jeromy's avatar
Jeromy committed
400 401 402
	}
}

403
// TODO: Could be done async
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
404
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*Message_PBPeer) []*peer.Peer {
405
	var provArr []*peer.Peer
Jeromy's avatar
Jeromy committed
406 407 408 409 410 411
	for _, prov := range peers {
		// Dont add outselves to the list
		if peer.ID(prov.GetId()).Equal(dht.self.ID) {
			continue
		}
		// Dont add someone who is already on the list
412
		p := dht.network.GetPeer(u.Key(prov.GetId()))
Jeromy's avatar
Jeromy committed
413
		if p == nil {
414
			u.DOut("given provider %s was not in our network already.\n", peer.ID(prov.GetId()).Pretty())
Jeromy's avatar
Jeromy committed
415 416
			var err error
			p, err = dht.peerFromInfo(prov)
Jeromy's avatar
Jeromy committed
417
			if err != nil {
418
				u.PErr("error connecting to new peer: %s\n", err)
Jeromy's avatar
Jeromy committed
419 420 421
				continue
			}
		}
422
		dht.providers.AddProvider(key, p)
423
		provArr = append(provArr, p)
Jeromy's avatar
Jeromy committed
424
	}
425
	return provArr
Jeromy's avatar
Jeromy committed
426
}
Jeromy's avatar
Jeromy committed
427

428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
// nearestPeerToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeerToQuery(pmes *Message) *peer.Peer {
	level := pmes.GetClusterLevel()
	cluster := dht.routingTables[level]

	key := u.Key(pmes.GetKey())
	closer := cluster.NearestPeer(kb.ConvertKey(key))
	return closer
}

// betterPeerToQuery returns nearestPeerToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
	closer := dht.nearestPeerToQuery(pmes)

	// no node? nil
	if closer == nil {
		return nil
	}

	// == to self? nil
	if closer.ID.Equal(dht.self.ID) {
		u.DOut("Attempted to return self! this shouldnt happen...\n")
		return nil
	}

	// self is closer? nil
	if kb.Closer(dht.self.ID, closer.ID, key) {
		return nil
	}

	// ok seems like a closer node.
	return closer
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
462
func (dht *IpfsDHT) peerFromInfo(pbp *Message_PBPeer) (*peer.Peer, error) {
Jeromy's avatar
Jeromy committed
463 464 465 466 467 468 469
	maddr, err := ma.NewMultiaddr(pbp.GetAddr())
	if err != nil {
		return nil, err
	}

	return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
}
470 471

func (dht *IpfsDHT) loadProvidableKeys() error {
472 473 474 475
	kl, err := dht.datastore.KeyList()
	if err != nil {
		return err
	}
476 477 478 479 480 481 482 483 484 485 486 487
	for _, k := range kl {
		dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
	}
	return nil
}

// Builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap() {
	id := make([]byte, 16)
	rand.Read(id)
	dht.FindPeer(peer.ID(id), time.Second*10)
}