dht.go 11.4 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
// Package dht implements a distributed hash table that satisfies the ipfs routing
2
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4
package dht

5
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"bytes"
7
	"crypto/rand"
8
	"errors"
9
	"fmt"
10 11
	"sync"
	"time"
12

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	host "github.com/jbenet/go-ipfs/p2p/host"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	peer "github.com/jbenet/go-ipfs/p2p/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	protocol "github.com/jbenet/go-ipfs/p2p/protocol"
16
	routing "github.com/jbenet/go-ipfs/routing"
17
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
Jeromy's avatar
Jeromy committed
18
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
19
	u "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20
	"github.com/jbenet/go-ipfs/util/eventlog"
21

22
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
23
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
24 25
	ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
26 27
)

28
var log = eventlog.Logger("dht")
29

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30 31
var ProtocolDHT protocol.ID = "/ipfs/dht"

32
const doPinging = false
33

34 35 36 37
// NumBootstrapQueries defines the number of random dht queries to do to
// collect members of the routing table.
const NumBootstrapQueries = 5

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38 39 40 41 42
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
	host      host.Host      // the network services we need
44 45
	self      peer.ID        // Local peer (yourself)
	peerstore peer.Peerstore // Peer Registry
46

47
	datastore ds.ThreadSafeDatastore // Local data
48

49 50
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
	providers    *ProviderManager
51

52 53
	birth    time.Time  // When this peer started up
	diaglock sync.Mutex // lock to make diagnostics work better
54

55 56 57
	// record validator funcs
	Validators map[string]ValidatorFunc

58
	ctxgroup.ContextGroup
59 60
}

Jeromy's avatar
Jeromy committed
61
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *IpfsDHT {
63
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
64
	dht.datastore = dstore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65 66
	dht.self = h.ID()
	dht.peerstore = h.Peerstore()
67
	dht.ContextGroup = ctxgroup.WithContext(ctx)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68 69
	dht.host = h
	h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
70

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
	dht.providers = NewProviderManager(dht.Context(), dht.self)
72
	dht.AddChildGroup(dht.providers)
73

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74
	dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
75
	dht.birth = time.Now()
76

77
	dht.Validators = make(map[string]ValidatorFunc)
78
	dht.Validators["pk"] = ValidatePublicKeyRecord
79

80
	if doPinging {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
		dht.Children().Add(1)
82 83
		go dht.PingRoutine(time.Second * 10)
	}
Jeromy's avatar
Jeromy committed
84
	return dht
85 86
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87 88 89 90 91
// LocalPeer returns the peer.Peer of the dht.
func (dht *IpfsDHT) LocalPeer() peer.ID {
	return dht.self
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92 93 94 95 96
// log returns the dht's logger
func (dht *IpfsDHT) log() eventlog.EventLogger {
	return log.Prefix("dht(%s)", dht.self)
}

97
// Connect to a new peer at the given address, ping and add to the routing table
98
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99 100
	// TODO: change interface to accept a PeerInfo as well.
	if err := dht.host.Connect(ctx, peer.PeerInfo{ID: npeer}); err != nil {
101
		return err
102 103
	}

Jeromy's avatar
Jeromy committed
104 105
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
106 107
	if _, err := dht.Ping(ctx, npeer); err != nil {
		return fmt.Errorf("failed to ping newly connected peer: %s", err)
Jeromy's avatar
Jeromy committed
108
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
109
	log.Event(ctx, "connect", dht.self, npeer)
110
	dht.Update(ctx, npeer)
111
	return nil
Jeromy's avatar
Jeromy committed
112 113
}

Jeromy's avatar
Jeromy committed
114 115
// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
116
	key u.Key, rec *pb.Record) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117

118
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
119
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121 122 123
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
124

125
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126 127 128
		return errors.New("value not put correctly")
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130
}

Jeromy's avatar
Jeromy committed
131 132
// putProvider sends a message to peer 'p' saying that the local node
// can provide the value of 'key'
133
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, key string) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134

135
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
136 137

	// add self as the provider
138
	pi := dht.peerstore.PeerInfo(dht.self)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139
	pmes.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), []peer.PeerInfo{pi})
140

141
	err := dht.sendMessage(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142 143 144
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146
	log.Debugf("%s putProvider: %s for %s (%s)", dht.self, p, u.Key(key), pi.Addrs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147 148

	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149 150
}

151 152 153 154 155 156
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
// NOTE: it will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
	key u.Key) ([]byte, []peer.PeerInfo, error) {
157

158
	pmes, err := dht.getValueSingle(ctx, p, key)
159
	if err != nil {
160
		return nil, nil, err
161 162
	}

163
	if record := pmes.GetRecord(); record != nil {
164
		// Success! We were given the value
Jeromy's avatar
Jeromy committed
165
		log.Debug("getValueOrPeers: got value")
166

167 168
		// make sure record is valid.
		err = dht.verifyRecordOnline(ctx, record)
169
		if err != nil {
Jeromy's avatar
Jeromy committed
170
			log.Error("Received invalid record!")
171 172 173
			return nil, nil, err
		}
		return record.GetValue(), nil, nil
174
	}
175

176
	// Perhaps we were given closer peers
177
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
178
	if len(peers) > 0 {
179
		log.Debug("getValueOrPeers: peers")
180 181 182
		return nil, peers, nil
	}

183 184
	log.Warning("getValueOrPeers: routing.ErrNotFound")
	return nil, nil, routing.ErrNotFound
185 186
}

187
// getValueSingle simply performs the get value RPC with the given parameters
188
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
189
	key u.Key) (*pb.Message, error) {
190

191
	pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
192
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
193 194
}

195
// getLocal attempts to retrieve the value from the datastore
196
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
197

Jeromy's avatar
Jeromy committed
198
	log.Debug("getLocal %s", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199
	v, err := dht.datastore.Get(key.DsKey())
200 201 202
	if err != nil {
		return nil, err
	}
Jeromy's avatar
Jeromy committed
203
	log.Debug("found in db")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204 205 206

	byt, ok := v.([]byte)
	if !ok {
207
		return nil, errors.New("value stored in datastore not []byte")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208
	}
209 210 211 212 213 214 215 216
	rec := new(pb.Record)
	err = proto.Unmarshal(byt, rec)
	if err != nil {
		return nil, err
	}

	// TODO: 'if paranoid'
	if u.Debug {
217
		err = dht.verifyRecordLocally(rec)
218
		if err != nil {
Jeromy's avatar
Jeromy committed
219
			log.Errorf("local record verify failed: %s", err)
220 221 222 223 224
			return nil, err
		}
	}

	return rec.GetValue(), nil
225 226
}

227
// putLocal stores the key value pair in the datastore
228
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
229 230 231 232 233 234 235 236 237 238
	rec, err := dht.makePutRecord(key, value)
	if err != nil {
		return err
	}
	data, err := proto.Marshal(rec)
	if err != nil {
		return err
	}

	return dht.datastore.Put(key.DsKey(), data)
239
}
240

241
// Update signals the routingTable to Update its last-seen status
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
242
// on the given peer.
243
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
244
	log.Event(ctx, "updatePeer", p)
245
	dht.routingTable.Update(p)
246
}
Jeromy's avatar
Jeromy committed
247

Jeromy's avatar
Jeromy committed
248
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
249
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo {
250
	p := dht.routingTable.Find(id)
251
	if p != "" {
Jeromy's avatar
Jeromy committed
252
		return dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
253
	}
Jeromy's avatar
Jeromy committed
254
	return peer.PeerInfo{}
Jeromy's avatar
Jeromy committed
255
}
256

Jeromy's avatar
Jeromy committed
257
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
258
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
259
	pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
260
	return dht.sendRequest(ctx, p, pmes)
261
}
262

263
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) {
264
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
265
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
266 267
}

268
// nearestPeersToQuery returns the routing tables closest peers.
269
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
270
	key := u.Key(pmes.GetKey())
271
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
272 273 274
	return closer
}

275
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
276
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
277
	closer := dht.nearestPeersToQuery(pmes, count)
278 279 280 281 282 283

	// no node? nil
	if closer == nil {
		return nil
	}

284 285
	// == to self? thats bad
	for _, p := range closer {
286
		if p == dht.self {
287 288 289
			log.Error("Attempted to return self! this shouldnt happen...")
			return nil
		}
290 291
	}

292
	var filtered []peer.ID
293 294 295 296 297 298
	for _, clp := range closer {
		// Dont send a peer back themselves
		if p == clp {
			continue
		}

299 300
		// must all be closer than self
		key := u.Key(pmes.GetKey())
301 302
		if !kb.Closer(dht.self, clp, key) {
			filtered = append(filtered, clp)
303
		}
304 305
	}

306 307
	// ok seems like closer nodes
	return filtered
308 309
}

310 311 312
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error {
	if p == dht.self {
		return errors.New("attempting to ensure connection to self")
Jeromy's avatar
Jeromy committed
313 314
	}

315
	// dial connection
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
316
	return dht.host.Connect(ctx, peer.PeerInfo{ID: p})
Jeromy's avatar
Jeromy committed
317
}
318

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
319
// PingRoutine periodically pings nearest neighbors.
320
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
321 322
	defer dht.Children().Done()

323 324 325 326 327 328
	tick := time.Tick(t)
	for {
		select {
		case <-tick:
			id := make([]byte, 16)
			rand.Read(id)
329
			peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5)
330
			for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
331
				ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
332
				_, err := dht.Ping(ctx, p)
333
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
334
					log.Errorf("Ping error: %s", err)
335 336
				}
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
337
		case <-dht.Closing():
338 339 340 341 342
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
343
// Bootstrap builds up list of peers by requesting random peer IDs
Brian Tiger Chow's avatar
Brian Tiger Chow committed
344
func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) error {
345
	var merr u.MultiErr
346

347 348 349
	randomID := func() peer.ID {
		// 16 random bytes is not a valid peer id. it may be fine becuase
		// the dht will rehash to its own keyspace anyway.
350 351
		id := make([]byte, 16)
		rand.Read(id)
352 353 354 355
		return peer.ID(id)
	}

	// bootstrap sequentially, as results will compound
356
	runQuery := func(ctx context.Context, id peer.ID) {
357
		p, err := dht.FindPeer(ctx, id)
358 359 360
		if err == routing.ErrNotFound {
			// this isn't an error. this is precisely what we expect.
		} else if err != nil {
361
			merr = append(merr, err)
362
		} else {
363 364 365 366 367
			// woah, actually found a peer with that ID? this shouldn't happen normally
			// (as the ID we use is not a real ID). this is an odd error worth logging.
			err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
			log.Errorf("%s", err)
			merr = append(merr, err)
368
		}
369
	}
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402

	sequential := true
	if sequential {
		// these should be parallel normally. but can make them sequential for debugging.
		// note that the core/bootstrap context deadline should be extended too for that.
		for i := 0; i < queries; i++ {
			id := randomID()
			log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
			runQuery(ctx, id)
		}

	} else {
		// note on parallelism here: the context is passed in to the queries, so they
		// **should** exit when it exceeds, making this function exit on ctx cancel.
		// normally, we should be selecting on ctx.Done() here too, but this gets
		// complicated to do with WaitGroup, and doesnt wait for the children to exit.
		var wg sync.WaitGroup
		for i := 0; i < queries; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()

				id := randomID()
				log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
				runQuery(ctx, id)
			}()
		}
		wg.Wait()
	}

	if len(merr) > 0 {
		return merr
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
403
	return nil
404
}