dht.go 12.5 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
// Package dht implements a distributed hash table that satisfies the ipfs routing
2
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4
package dht

5
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"bytes"
7
	"crypto/rand"
8
	"errors"
9
	"fmt"
10 11
	"sync"
	"time"
12

Jeromy's avatar
Jeromy committed
13
	ci "github.com/jbenet/go-ipfs/p2p/crypto"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	host "github.com/jbenet/go-ipfs/p2p/host"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	peer "github.com/jbenet/go-ipfs/p2p/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16
	protocol "github.com/jbenet/go-ipfs/p2p/protocol"
17
	routing "github.com/jbenet/go-ipfs/routing"
18
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
Jeromy's avatar
Jeromy committed
19
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
20
	"github.com/jbenet/go-ipfs/thirdparty/eventlog"
21
	u "github.com/jbenet/go-ipfs/util"
22

23
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
24
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
25 26
	ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
27 28
)

29
var log = eventlog.Logger("dht")
30

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32
var ProtocolDHT protocol.ID = "/ipfs/dht"

33
const doPinging = false
34

35 36 37 38
// NumBootstrapQueries defines the number of random dht queries to do to
// collect members of the routing table.
const NumBootstrapQueries = 5

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40 41 42 43
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44
	host      host.Host      // the network services we need
45 46
	self      peer.ID        // Local peer (yourself)
	peerstore peer.Peerstore // Peer Registry
47

48
	datastore ds.ThreadSafeDatastore // Local data
49

50 51
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
	providers    *ProviderManager
52

53 54
	birth    time.Time  // When this peer started up
	diaglock sync.Mutex // lock to make diagnostics work better
55

56 57 58
	// record validator funcs
	Validators map[string]ValidatorFunc

59
	ctxgroup.ContextGroup
60 61
}

Jeromy's avatar
Jeromy committed
62
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *IpfsDHT {
64
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
65
	dht.datastore = dstore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66 67
	dht.self = h.ID()
	dht.peerstore = h.Peerstore()
68
	dht.ContextGroup = ctxgroup.WithContext(ctx)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
	dht.host = h
70

71 72 73 74 75 76
	// sanity check. this should **never** happen
	if len(dht.peerstore.Addresses(dht.self)) < 1 {
		panic("attempt to initialize dht without addresses for self")
	}

	h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
	dht.providers = NewProviderManager(dht.Context(), dht.self)
78
	dht.AddChildGroup(dht.providers)
79

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80
	dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
81
	dht.birth = time.Now()
82

83
	dht.Validators = make(map[string]ValidatorFunc)
84
	dht.Validators["pk"] = ValidatePublicKeyRecord
85

86
	if doPinging {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87
		dht.Children().Add(1)
88 89
		go dht.PingRoutine(time.Second * 10)
	}
Jeromy's avatar
Jeromy committed
90
	return dht
91 92
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93 94 95 96 97
// LocalPeer returns the peer.Peer of the dht.
func (dht *IpfsDHT) LocalPeer() peer.ID {
	return dht.self
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98 99 100 101 102
// log returns the dht's logger
func (dht *IpfsDHT) log() eventlog.EventLogger {
	return log.Prefix("dht(%s)", dht.self)
}

103
// Connect to a new peer at the given address, ping and add to the routing table
104
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105 106
	// TODO: change interface to accept a PeerInfo as well.
	if err := dht.host.Connect(ctx, peer.PeerInfo{ID: npeer}); err != nil {
107
		return err
108 109
	}

Jeromy's avatar
Jeromy committed
110 111
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
112 113
	if _, err := dht.Ping(ctx, npeer); err != nil {
		return fmt.Errorf("failed to ping newly connected peer: %s", err)
Jeromy's avatar
Jeromy committed
114
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
115
	log.Event(ctx, "connect", dht.self, npeer)
116
	dht.Update(ctx, npeer)
117
	return nil
Jeromy's avatar
Jeromy committed
118 119
}

Jeromy's avatar
Jeromy committed
120 121
// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
122
	key u.Key, rec *pb.Record) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123

124
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
125
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128 129
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130

131
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133 134
		return errors.New("value not put correctly")
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135 136
}

Jeromy's avatar
Jeromy committed
137 138
// putProvider sends a message to peer 'p' saying that the local node
// can provide the value of 'key'
139
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, key string) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140

141
	// add self as the provider
142
	pi := dht.peerstore.PeerInfo(dht.self)
143 144 145 146 147 148
	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		log.Errorf("%s putProvider: %s for %s error: no wan-friendly addresses", dht.self, p, u.Key(key), pi.Addrs)
		return fmt.Errorf("no known addresses for self. cannot put provider.")
	}
149

150 151
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
	pmes.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), []peer.PeerInfo{pi})
152
	err := dht.sendMessage(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153 154 155
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
	log.Debugf("%s putProvider: %s for %s (%s)", dht.self, p, u.Key(key), pi.Addrs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159 160
}

161 162 163 164 165 166
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
// NOTE: it will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
	key u.Key) ([]byte, []peer.PeerInfo, error) {
167

168
	pmes, err := dht.getValueSingle(ctx, p, key)
169
	if err != nil {
170
		return nil, nil, err
171 172
	}

173
	if record := pmes.GetRecord(); record != nil {
174
		// Success! We were given the value
Jeromy's avatar
Jeromy committed
175
		log.Debug("getValueOrPeers: got value")
176

177 178
		// make sure record is valid.
		err = dht.verifyRecordOnline(ctx, record)
179
		if err != nil {
Jeromy's avatar
Jeromy committed
180
			log.Error("Received invalid record!")
181 182 183
			return nil, nil, err
		}
		return record.GetValue(), nil, nil
184
	}
185

186
	// Perhaps we were given closer peers
187
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
188
	if len(peers) > 0 {
189
		log.Debug("getValueOrPeers: peers")
190 191 192
		return nil, peers, nil
	}

193 194
	log.Warning("getValueOrPeers: routing.ErrNotFound")
	return nil, nil, routing.ErrNotFound
195 196
}

197
// getValueSingle simply performs the get value RPC with the given parameters
198
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
199
	key u.Key) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
200
	defer log.EventBegin(ctx, "getValueSingle", p, &key).Done()
201

202
	pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
203
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
204 205
}

206
// getLocal attempts to retrieve the value from the datastore
207
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
208

Jeromy's avatar
Jeromy committed
209
	log.Debug("getLocal %s", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210
	v, err := dht.datastore.Get(key.DsKey())
211 212 213
	if err != nil {
		return nil, err
	}
Jeromy's avatar
Jeromy committed
214
	log.Debug("found in db")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215 216 217

	byt, ok := v.([]byte)
	if !ok {
218
		return nil, errors.New("value stored in datastore not []byte")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219
	}
220 221 222 223 224 225 226 227
	rec := new(pb.Record)
	err = proto.Unmarshal(byt, rec)
	if err != nil {
		return nil, err
	}

	// TODO: 'if paranoid'
	if u.Debug {
228
		err = dht.verifyRecordLocally(rec)
229
		if err != nil {
Jeromy's avatar
Jeromy committed
230
			log.Errorf("local record verify failed: %s", err)
231 232 233 234 235
			return nil, err
		}
	}

	return rec.GetValue(), nil
236 237
}

Jeromy's avatar
Jeromy committed
238 239
// getOwnPrivateKey attempts to load the local peers private
// key from the peerstore.
Jeromy's avatar
Jeromy committed
240 241 242 243 244 245 246 247 248
func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) {
	sk := dht.peerstore.PrivKey(dht.self)
	if sk == nil {
		log.Errorf("%s dht cannot get own private key!", dht.self)
		return nil, fmt.Errorf("cannot get private key to sign record!")
	}
	return sk, nil
}

249
// putLocal stores the key value pair in the datastore
250
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
Jeromy's avatar
Jeromy committed
251 252 253 254 255 256
	sk, err := dht.getOwnPrivateKey()
	if err != nil {
		return err
	}

	rec, err := MakePutRecord(sk, key, value)
257 258 259 260 261 262 263 264 265
	if err != nil {
		return err
	}
	data, err := proto.Marshal(rec)
	if err != nil {
		return err
	}

	return dht.datastore.Put(key.DsKey(), data)
266
}
267

268
// Update signals the routingTable to Update its last-seen status
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
269
// on the given peer.
270
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
271
	log.Event(ctx, "updatePeer", p)
272
	dht.routingTable.Update(p)
273
}
Jeromy's avatar
Jeromy committed
274

Jeromy's avatar
Jeromy committed
275
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
276
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo {
277
	p := dht.routingTable.Find(id)
278
	if p != "" {
Jeromy's avatar
Jeromy committed
279
		return dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
280
	}
Jeromy's avatar
Jeromy committed
281
	return peer.PeerInfo{}
Jeromy's avatar
Jeromy committed
282
}
283

Jeromy's avatar
Jeromy committed
284
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
285
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
286
	defer log.EventBegin(ctx, "findPeerSingle", p, id).Done()
287

288
	pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
289
	return dht.sendRequest(ctx, p, pmes)
290
}
291

292
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
293
	defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done()
294

295
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
297 298
}

299
// nearestPeersToQuery returns the routing tables closest peers.
300
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
301
	key := u.Key(pmes.GetKey())
302
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
303 304 305
	return closer
}

306
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
307
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
308
	closer := dht.nearestPeersToQuery(pmes, count)
309 310 311 312 313 314

	// no node? nil
	if closer == nil {
		return nil
	}

315 316
	// == to self? thats bad
	for _, p := range closer {
317
		if p == dht.self {
318 319 320
			log.Error("Attempted to return self! this shouldnt happen...")
			return nil
		}
321 322
	}

323
	var filtered []peer.ID
324 325 326 327 328 329
	for _, clp := range closer {
		// Dont send a peer back themselves
		if p == clp {
			continue
		}

330 331
		// must all be closer than self
		key := u.Key(pmes.GetKey())
332 333
		if !kb.Closer(dht.self, clp, key) {
			filtered = append(filtered, clp)
334
		}
335 336
	}

337 338
	// ok seems like closer nodes
	return filtered
339 340
}

341 342 343
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error {
	if p == dht.self {
		return errors.New("attempting to ensure connection to self")
Jeromy's avatar
Jeromy committed
344 345
	}

346
	// dial connection
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
347
	return dht.host.Connect(ctx, peer.PeerInfo{ID: p})
Jeromy's avatar
Jeromy committed
348
}
349

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
350
// PingRoutine periodically pings nearest neighbors.
351
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
352 353
	defer dht.Children().Done()

354 355 356 357 358 359
	tick := time.Tick(t)
	for {
		select {
		case <-tick:
			id := make([]byte, 16)
			rand.Read(id)
360
			peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5)
361
			for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
362
				ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
363
				_, err := dht.Ping(ctx, p)
364
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
365
					log.Errorf("Ping error: %s", err)
366 367
				}
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
368
		case <-dht.Closing():
369 370 371 372 373
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
374
// Bootstrap builds up list of peers by requesting random peer IDs
Brian Tiger Chow's avatar
Brian Tiger Chow committed
375
func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) error {
376
	var merr u.MultiErr
377

378 379 380
	randomID := func() peer.ID {
		// 16 random bytes is not a valid peer id. it may be fine becuase
		// the dht will rehash to its own keyspace anyway.
381 382
		id := make([]byte, 16)
		rand.Read(id)
383 384 385 386
		return peer.ID(id)
	}

	// bootstrap sequentially, as results will compound
387
	runQuery := func(ctx context.Context, id peer.ID) {
388
		p, err := dht.FindPeer(ctx, id)
389 390 391
		if err == routing.ErrNotFound {
			// this isn't an error. this is precisely what we expect.
		} else if err != nil {
392
			merr = append(merr, err)
393
		} else {
394 395 396 397 398
			// woah, actually found a peer with that ID? this shouldn't happen normally
			// (as the ID we use is not a real ID). this is an odd error worth logging.
			err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
			log.Errorf("%s", err)
			merr = append(merr, err)
399
		}
400
	}
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433

	sequential := true
	if sequential {
		// these should be parallel normally. but can make them sequential for debugging.
		// note that the core/bootstrap context deadline should be extended too for that.
		for i := 0; i < queries; i++ {
			id := randomID()
			log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
			runQuery(ctx, id)
		}

	} else {
		// note on parallelism here: the context is passed in to the queries, so they
		// **should** exit when it exceeds, making this function exit on ctx cancel.
		// normally, we should be selecting on ctx.Done() here too, but this gets
		// complicated to do with WaitGroup, and doesnt wait for the children to exit.
		var wg sync.WaitGroup
		for i := 0; i < queries; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()

				id := randomID()
				log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
				runQuery(ctx, id)
			}()
		}
		wg.Wait()
	}

	if len(merr) > 0 {
		return merr
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
434
	return nil
435
}