dht.go 12.6 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
// Package dht implements a distributed hash table that satisfies the ipfs routing
2
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4
package dht

5
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"bytes"
7
	"crypto/rand"
8
	"errors"
9
	"fmt"
10 11
	"sync"
	"time"
12

Jeromy's avatar
Jeromy committed
13
	ci "github.com/jbenet/go-ipfs/p2p/crypto"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	host "github.com/jbenet/go-ipfs/p2p/host"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	peer "github.com/jbenet/go-ipfs/p2p/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16
	protocol "github.com/jbenet/go-ipfs/p2p/protocol"
17
	routing "github.com/jbenet/go-ipfs/routing"
18
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
Jeromy's avatar
Jeromy committed
19
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Jeromy's avatar
Jeromy committed
20
	record "github.com/jbenet/go-ipfs/routing/record"
21
	"github.com/jbenet/go-ipfs/thirdparty/eventlog"
22
	u "github.com/jbenet/go-ipfs/util"
23

24
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
25
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
26 27
	ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
28 29
)

30
var log = eventlog.Logger("dht")
31

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33
var ProtocolDHT protocol.ID = "/ipfs/dht"

34
const doPinging = false
35

36 37 38 39
// NumBootstrapQueries defines the number of random dht queries to do to
// collect members of the routing table.
const NumBootstrapQueries = 5

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40 41 42 43 44
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
	host      host.Host      // the network services we need
46 47
	self      peer.ID        // Local peer (yourself)
	peerstore peer.Peerstore // Peer Registry
48

49
	datastore ds.ThreadSafeDatastore // Local data
50

51 52
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
	providers    *ProviderManager
53

54 55
	birth    time.Time  // When this peer started up
	diaglock sync.Mutex // lock to make diagnostics work better
56

57 58 59
	// record validator funcs
	Validators map[string]ValidatorFunc

60
	ctxgroup.ContextGroup
61 62
}

Jeromy's avatar
Jeromy committed
63
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64
func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *IpfsDHT {
65
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
66
	dht.datastore = dstore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67 68
	dht.self = h.ID()
	dht.peerstore = h.Peerstore()
69
	dht.ContextGroup = ctxgroup.WithContext(ctx)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
	dht.host = h
71

72 73 74 75 76 77
	// sanity check. this should **never** happen
	if len(dht.peerstore.Addresses(dht.self)) < 1 {
		panic("attempt to initialize dht without addresses for self")
	}

	h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78
	dht.providers = NewProviderManager(dht.Context(), dht.self)
79
	dht.AddChildGroup(dht.providers)
80

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
	dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
82
	dht.birth = time.Now()
83

84
	dht.Validators = make(map[string]ValidatorFunc)
85
	dht.Validators["pk"] = ValidatePublicKeyRecord
86

87
	if doPinging {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
		dht.Children().Add(1)
89 90
		go dht.PingRoutine(time.Second * 10)
	}
Jeromy's avatar
Jeromy committed
91
	return dht
92 93
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94 95 96 97 98
// LocalPeer returns the peer.Peer of the dht.
func (dht *IpfsDHT) LocalPeer() peer.ID {
	return dht.self
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99 100 101 102 103
// log returns the dht's logger
func (dht *IpfsDHT) log() eventlog.EventLogger {
	return log.Prefix("dht(%s)", dht.self)
}

104
// Connect to a new peer at the given address, ping and add to the routing table
105
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106 107
	// TODO: change interface to accept a PeerInfo as well.
	if err := dht.host.Connect(ctx, peer.PeerInfo{ID: npeer}); err != nil {
108
		return err
109 110
	}

Jeromy's avatar
Jeromy committed
111 112
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
113 114
	if _, err := dht.Ping(ctx, npeer); err != nil {
		return fmt.Errorf("failed to ping newly connected peer: %s", err)
Jeromy's avatar
Jeromy committed
115
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
116
	log.Event(ctx, "connect", dht.self, npeer)
117
	dht.Update(ctx, npeer)
118
	return nil
Jeromy's avatar
Jeromy committed
119 120
}

Jeromy's avatar
Jeromy committed
121 122
// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
123
	key u.Key, rec *pb.Record) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
124

125
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
126
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
128 129 130
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131

132
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
133 134 135
		return errors.New("value not put correctly")
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136 137
}

Jeromy's avatar
Jeromy committed
138 139
// putProvider sends a message to peer 'p' saying that the local node
// can provide the value of 'key'
140
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, key string) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141

142
	// add self as the provider
143
	pi := dht.peerstore.PeerInfo(dht.self)
144 145 146 147 148 149
	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		log.Errorf("%s putProvider: %s for %s error: no wan-friendly addresses", dht.self, p, u.Key(key), pi.Addrs)
		return fmt.Errorf("no known addresses for self. cannot put provider.")
	}
150

151 152
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
	pmes.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), []peer.PeerInfo{pi})
153
	err := dht.sendMessage(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154 155 156
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
	log.Debugf("%s putProvider: %s for %s (%s)", dht.self, p, u.Key(key), pi.Addrs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160 161
}

162 163 164 165 166 167
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
// NOTE: it will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
	key u.Key) ([]byte, []peer.PeerInfo, error) {
168

169
	pmes, err := dht.getValueSingle(ctx, p, key)
170
	if err != nil {
171
		return nil, nil, err
172 173
	}

174
	if record := pmes.GetRecord(); record != nil {
175
		// Success! We were given the value
Jeromy's avatar
Jeromy committed
176
		log.Debug("getValueOrPeers: got value")
177

178 179
		// make sure record is valid.
		err = dht.verifyRecordOnline(ctx, record)
180
		if err != nil {
Jeromy's avatar
Jeromy committed
181
			log.Error("Received invalid record!")
182 183 184
			return nil, nil, err
		}
		return record.GetValue(), nil, nil
185
	}
186

187
	// Perhaps we were given closer peers
188
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
189
	if len(peers) > 0 {
190
		log.Debug("getValueOrPeers: peers")
191 192 193
		return nil, peers, nil
	}

194 195
	log.Warning("getValueOrPeers: routing.ErrNotFound")
	return nil, nil, routing.ErrNotFound
196 197
}

198
// getValueSingle simply performs the get value RPC with the given parameters
199
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
200
	key u.Key) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
201
	defer log.EventBegin(ctx, "getValueSingle", p, &key).Done()
202

203
	pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
204
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
205 206
}

207
// getLocal attempts to retrieve the value from the datastore
208
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
209

Jeromy's avatar
Jeromy committed
210
	log.Debug("getLocal %s", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
211
	v, err := dht.datastore.Get(key.DsKey())
212 213 214
	if err != nil {
		return nil, err
	}
Jeromy's avatar
Jeromy committed
215
	log.Debug("found in db")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216 217 218

	byt, ok := v.([]byte)
	if !ok {
219
		return nil, errors.New("value stored in datastore not []byte")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220
	}
221 222 223 224 225 226 227 228
	rec := new(pb.Record)
	err = proto.Unmarshal(byt, rec)
	if err != nil {
		return nil, err
	}

	// TODO: 'if paranoid'
	if u.Debug {
229
		err = dht.verifyRecordLocally(rec)
230
		if err != nil {
Jeromy's avatar
Jeromy committed
231
			log.Errorf("local record verify failed: %s", err)
232 233 234 235 236
			return nil, err
		}
	}

	return rec.GetValue(), nil
237 238
}

Jeromy's avatar
Jeromy committed
239 240
// getOwnPrivateKey attempts to load the local peers private
// key from the peerstore.
Jeromy's avatar
Jeromy committed
241 242 243 244 245 246 247 248 249
func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) {
	sk := dht.peerstore.PrivKey(dht.self)
	if sk == nil {
		log.Errorf("%s dht cannot get own private key!", dht.self)
		return nil, fmt.Errorf("cannot get private key to sign record!")
	}
	return sk, nil
}

250
// putLocal stores the key value pair in the datastore
251
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
Jeromy's avatar
Jeromy committed
252 253 254 255 256
	sk, err := dht.getOwnPrivateKey()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
257
	rec, err := record.MakePutRecord(sk, key, value)
258 259 260 261 262 263 264 265 266
	if err != nil {
		return err
	}
	data, err := proto.Marshal(rec)
	if err != nil {
		return err
	}

	return dht.datastore.Put(key.DsKey(), data)
267
}
268

269
// Update signals the routingTable to Update its last-seen status
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
270
// on the given peer.
271
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
272
	log.Event(ctx, "updatePeer", p)
273
	dht.routingTable.Update(p)
274
}
Jeromy's avatar
Jeromy committed
275

Jeromy's avatar
Jeromy committed
276
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
277
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo {
278
	p := dht.routingTable.Find(id)
279
	if p != "" {
Jeromy's avatar
Jeromy committed
280
		return dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
281
	}
Jeromy's avatar
Jeromy committed
282
	return peer.PeerInfo{}
Jeromy's avatar
Jeromy committed
283
}
284

Jeromy's avatar
Jeromy committed
285
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
286
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
287
	defer log.EventBegin(ctx, "findPeerSingle", p, id).Done()
288

289
	pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
290
	return dht.sendRequest(ctx, p, pmes)
291
}
292

293
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
294
	defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done()
295

296
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
297
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
298 299
}

300
// nearestPeersToQuery returns the routing tables closest peers.
301
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
302
	key := u.Key(pmes.GetKey())
303
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
304 305 306
	return closer
}

307
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
308
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
309
	closer := dht.nearestPeersToQuery(pmes, count)
310 311 312 313 314 315

	// no node? nil
	if closer == nil {
		return nil
	}

316 317
	// == to self? thats bad
	for _, p := range closer {
318
		if p == dht.self {
319 320 321
			log.Error("Attempted to return self! this shouldnt happen...")
			return nil
		}
322 323
	}

324
	var filtered []peer.ID
325 326 327 328 329 330
	for _, clp := range closer {
		// Dont send a peer back themselves
		if p == clp {
			continue
		}

331 332
		// must all be closer than self
		key := u.Key(pmes.GetKey())
333 334
		if !kb.Closer(dht.self, clp, key) {
			filtered = append(filtered, clp)
335
		}
336 337
	}

338 339
	// ok seems like closer nodes
	return filtered
340 341
}

342 343 344
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error {
	if p == dht.self {
		return errors.New("attempting to ensure connection to self")
Jeromy's avatar
Jeromy committed
345 346
	}

347
	// dial connection
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
348
	return dht.host.Connect(ctx, peer.PeerInfo{ID: p})
Jeromy's avatar
Jeromy committed
349
}
350

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
351
// PingRoutine periodically pings nearest neighbors.
352
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
353 354
	defer dht.Children().Done()

355 356 357 358 359 360
	tick := time.Tick(t)
	for {
		select {
		case <-tick:
			id := make([]byte, 16)
			rand.Read(id)
361
			peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5)
362
			for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
363
				ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
364
				_, err := dht.Ping(ctx, p)
365
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
366
					log.Errorf("Ping error: %s", err)
367 368
				}
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
369
		case <-dht.Closing():
370 371 372 373 374
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
375
// Bootstrap builds up list of peers by requesting random peer IDs
Brian Tiger Chow's avatar
Brian Tiger Chow committed
376
func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) error {
377
	var merr u.MultiErr
378

379 380 381
	randomID := func() peer.ID {
		// 16 random bytes is not a valid peer id. it may be fine becuase
		// the dht will rehash to its own keyspace anyway.
382 383
		id := make([]byte, 16)
		rand.Read(id)
384 385 386 387
		return peer.ID(id)
	}

	// bootstrap sequentially, as results will compound
388
	runQuery := func(ctx context.Context, id peer.ID) {
389
		p, err := dht.FindPeer(ctx, id)
390 391 392
		if err == routing.ErrNotFound {
			// this isn't an error. this is precisely what we expect.
		} else if err != nil {
393
			merr = append(merr, err)
394
		} else {
395 396 397 398 399
			// woah, actually found a peer with that ID? this shouldn't happen normally
			// (as the ID we use is not a real ID). this is an odd error worth logging.
			err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
			log.Errorf("%s", err)
			merr = append(merr, err)
400
		}
401
	}
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434

	sequential := true
	if sequential {
		// these should be parallel normally. but can make them sequential for debugging.
		// note that the core/bootstrap context deadline should be extended too for that.
		for i := 0; i < queries; i++ {
			id := randomID()
			log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
			runQuery(ctx, id)
		}

	} else {
		// note on parallelism here: the context is passed in to the queries, so they
		// **should** exit when it exceeds, making this function exit on ctx cancel.
		// normally, we should be selecting on ctx.Done() here too, but this gets
		// complicated to do with WaitGroup, and doesnt wait for the children to exit.
		var wg sync.WaitGroup
		for i := 0; i < queries; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()

				id := randomID()
				log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
				runQuery(ctx, id)
			}()
		}
		wg.Wait()
	}

	if len(merr) > 0 {
		return merr
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
435
	return nil
436
}