dht.go 12.1 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
// Package dht implements a distributed hash table that satisfies the ipfs routing
2
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4
package dht

5
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"bytes"
7
	"crypto/rand"
8
	"errors"
9
	"fmt"
10 11
	"sync"
	"time"
12

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	host "github.com/jbenet/go-ipfs/p2p/host"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	peer "github.com/jbenet/go-ipfs/p2p/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	protocol "github.com/jbenet/go-ipfs/p2p/protocol"
16
	routing "github.com/jbenet/go-ipfs/routing"
17
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
Jeromy's avatar
Jeromy committed
18
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
19
	u "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20
	"github.com/jbenet/go-ipfs/util/eventlog"
21

22
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
23
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
24 25
	ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
26 27
)

28
var log = eventlog.Logger("dht")
29

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30 31
var ProtocolDHT protocol.ID = "/ipfs/dht"

32
const doPinging = false
33

34 35 36 37
// NumBootstrapQueries defines the number of random dht queries to do to
// collect members of the routing table.
const NumBootstrapQueries = 5

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38 39 40 41 42
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
	host      host.Host      // the network services we need
44 45
	self      peer.ID        // Local peer (yourself)
	peerstore peer.Peerstore // Peer Registry
46

47
	datastore ds.ThreadSafeDatastore // Local data
48

49 50
	routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
	providers    *ProviderManager
51

52 53
	birth    time.Time  // When this peer started up
	diaglock sync.Mutex // lock to make diagnostics work better
54

55 56 57
	// record validator funcs
	Validators map[string]ValidatorFunc

58
	ctxgroup.ContextGroup
59 60
}

Jeromy's avatar
Jeromy committed
61
// NewDHT creates a new DHT object with the given peer as the 'local' host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *IpfsDHT {
63
	dht := new(IpfsDHT)
Jeromy's avatar
Jeromy committed
64
	dht.datastore = dstore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65 66
	dht.self = h.ID()
	dht.peerstore = h.Peerstore()
67
	dht.ContextGroup = ctxgroup.WithContext(ctx)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68
	dht.host = h
69

70 71 72 73 74 75
	// sanity check. this should **never** happen
	if len(dht.peerstore.Addresses(dht.self)) < 1 {
		panic("attempt to initialize dht without addresses for self")
	}

	h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76
	dht.providers = NewProviderManager(dht.Context(), dht.self)
77
	dht.AddChildGroup(dht.providers)
78

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
	dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
80
	dht.birth = time.Now()
81

82
	dht.Validators = make(map[string]ValidatorFunc)
83
	dht.Validators["pk"] = ValidatePublicKeyRecord
84

85
	if doPinging {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
		dht.Children().Add(1)
87 88
		go dht.PingRoutine(time.Second * 10)
	}
Jeromy's avatar
Jeromy committed
89
	return dht
90 91
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92 93 94 95 96
// LocalPeer returns the peer.Peer of the dht.
func (dht *IpfsDHT) LocalPeer() peer.ID {
	return dht.self
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
97 98 99 100 101
// log returns the dht's logger
func (dht *IpfsDHT) log() eventlog.EventLogger {
	return log.Prefix("dht(%s)", dht.self)
}

102
// Connect to a new peer at the given address, ping and add to the routing table
103
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105
	// TODO: change interface to accept a PeerInfo as well.
	if err := dht.host.Connect(ctx, peer.PeerInfo{ID: npeer}); err != nil {
106
		return err
107 108
	}

Jeromy's avatar
Jeromy committed
109 110
	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
111 112
	if _, err := dht.Ping(ctx, npeer); err != nil {
		return fmt.Errorf("failed to ping newly connected peer: %s", err)
Jeromy's avatar
Jeromy committed
113
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
114
	log.Event(ctx, "connect", dht.self, npeer)
115
	dht.Update(ctx, npeer)
116
	return nil
Jeromy's avatar
Jeromy committed
117 118
}

Jeromy's avatar
Jeromy committed
119 120
// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
121
	key u.Key, rec *pb.Record) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122

123
	pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
124
	pmes.Record = rec
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125
	rpmes, err := dht.sendRequest(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126 127 128
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129

130
	if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131 132 133
		return errors.New("value not put correctly")
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134 135
}

Jeromy's avatar
Jeromy committed
136 137
// putProvider sends a message to peer 'p' saying that the local node
// can provide the value of 'key'
138
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, key string) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139

140
	// add self as the provider
141
	pi := dht.peerstore.PeerInfo(dht.self)
142 143 144 145 146 147
	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		log.Errorf("%s putProvider: %s for %s error: no wan-friendly addresses", dht.self, p, u.Key(key), pi.Addrs)
		return fmt.Errorf("no known addresses for self. cannot put provider.")
	}
148

149 150
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
	pmes.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), []peer.PeerInfo{pi})
151
	err := dht.sendMessage(ctx, p, pmes)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152 153 154
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
155

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156
	log.Debugf("%s putProvider: %s for %s (%s)", dht.self, p, u.Key(key), pi.Addrs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158 159
}

160 161 162 163 164 165
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
// NOTE: it will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
	key u.Key) ([]byte, []peer.PeerInfo, error) {
166

167
	pmes, err := dht.getValueSingle(ctx, p, key)
168
	if err != nil {
169
		return nil, nil, err
170 171
	}

172
	if record := pmes.GetRecord(); record != nil {
173
		// Success! We were given the value
Jeromy's avatar
Jeromy committed
174
		log.Debug("getValueOrPeers: got value")
175

176 177
		// make sure record is valid.
		err = dht.verifyRecordOnline(ctx, record)
178
		if err != nil {
Jeromy's avatar
Jeromy committed
179
			log.Error("Received invalid record!")
180 181 182
			return nil, nil, err
		}
		return record.GetValue(), nil, nil
183
	}
184

185
	// Perhaps we were given closer peers
186
	peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
187
	if len(peers) > 0 {
188
		log.Debug("getValueOrPeers: peers")
189 190 191
		return nil, peers, nil
	}

192 193
	log.Warning("getValueOrPeers: routing.ErrNotFound")
	return nil, nil, routing.ErrNotFound
194 195
}

196
// getValueSingle simply performs the get value RPC with the given parameters
197
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
198
	key u.Key) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
199
	defer log.EventBegin(ctx, "getValueSingle", p, &key).Done()
200

201
	pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
202
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
203 204
}

205
// getLocal attempts to retrieve the value from the datastore
206
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
207

Jeromy's avatar
Jeromy committed
208
	log.Debug("getLocal %s", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209
	v, err := dht.datastore.Get(key.DsKey())
210 211 212
	if err != nil {
		return nil, err
	}
Jeromy's avatar
Jeromy committed
213
	log.Debug("found in db")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214 215 216

	byt, ok := v.([]byte)
	if !ok {
217
		return nil, errors.New("value stored in datastore not []byte")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218
	}
219 220 221 222 223 224 225 226
	rec := new(pb.Record)
	err = proto.Unmarshal(byt, rec)
	if err != nil {
		return nil, err
	}

	// TODO: 'if paranoid'
	if u.Debug {
227
		err = dht.verifyRecordLocally(rec)
228
		if err != nil {
Jeromy's avatar
Jeromy committed
229
			log.Errorf("local record verify failed: %s", err)
230 231 232 233 234
			return nil, err
		}
	}

	return rec.GetValue(), nil
235 236
}

237
// putLocal stores the key value pair in the datastore
238
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
239 240 241 242 243 244 245 246 247 248
	rec, err := dht.makePutRecord(key, value)
	if err != nil {
		return err
	}
	data, err := proto.Marshal(rec)
	if err != nil {
		return err
	}

	return dht.datastore.Put(key.DsKey(), data)
249
}
250

251
// Update signals the routingTable to Update its last-seen status
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
// on the given peer.
253
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
254
	log.Event(ctx, "updatePeer", p)
255
	dht.routingTable.Update(p)
256
}
Jeromy's avatar
Jeromy committed
257

Jeromy's avatar
Jeromy committed
258
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Jeromy's avatar
Jeromy committed
259
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo {
260
	p := dht.routingTable.Find(id)
261
	if p != "" {
Jeromy's avatar
Jeromy committed
262
		return dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
263
	}
Jeromy's avatar
Jeromy committed
264
	return peer.PeerInfo{}
Jeromy's avatar
Jeromy committed
265
}
266

Jeromy's avatar
Jeromy committed
267
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
268
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
269
	defer log.EventBegin(ctx, "findPeerSingle", p, id).Done()
270

271
	pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272
	return dht.sendRequest(ctx, p, pmes)
273
}
274

275
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
276
	defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done()
277

278
	pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
279
	return dht.sendRequest(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
280 281
}

282
// nearestPeersToQuery returns the routing tables closest peers.
283
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
284
	key := u.Key(pmes.GetKey())
285
	closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
286 287 288
	return closer
}

289
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
290
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
291
	closer := dht.nearestPeersToQuery(pmes, count)
292 293 294 295 296 297

	// no node? nil
	if closer == nil {
		return nil
	}

298 299
	// == to self? thats bad
	for _, p := range closer {
300
		if p == dht.self {
301 302 303
			log.Error("Attempted to return self! this shouldnt happen...")
			return nil
		}
304 305
	}

306
	var filtered []peer.ID
307 308 309 310 311 312
	for _, clp := range closer {
		// Dont send a peer back themselves
		if p == clp {
			continue
		}

313 314
		// must all be closer than self
		key := u.Key(pmes.GetKey())
315 316
		if !kb.Closer(dht.self, clp, key) {
			filtered = append(filtered, clp)
317
		}
318 319
	}

320 321
	// ok seems like closer nodes
	return filtered
322 323
}

324 325 326
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error {
	if p == dht.self {
		return errors.New("attempting to ensure connection to self")
Jeromy's avatar
Jeromy committed
327 328
	}

329
	// dial connection
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
330
	return dht.host.Connect(ctx, peer.PeerInfo{ID: p})
Jeromy's avatar
Jeromy committed
331
}
332

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
333
// PingRoutine periodically pings nearest neighbors.
334
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
335 336
	defer dht.Children().Done()

337 338 339 340 341 342
	tick := time.Tick(t)
	for {
		select {
		case <-tick:
			id := make([]byte, 16)
			rand.Read(id)
343
			peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5)
344
			for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
345
				ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
346
				_, err := dht.Ping(ctx, p)
347
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
348
					log.Errorf("Ping error: %s", err)
349 350
				}
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
351
		case <-dht.Closing():
352 353 354 355 356
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
357
// Bootstrap builds up list of peers by requesting random peer IDs
Brian Tiger Chow's avatar
Brian Tiger Chow committed
358
func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) error {
359
	var merr u.MultiErr
360

361 362 363
	randomID := func() peer.ID {
		// 16 random bytes is not a valid peer id. it may be fine becuase
		// the dht will rehash to its own keyspace anyway.
364 365
		id := make([]byte, 16)
		rand.Read(id)
366 367 368 369
		return peer.ID(id)
	}

	// bootstrap sequentially, as results will compound
370
	runQuery := func(ctx context.Context, id peer.ID) {
371
		p, err := dht.FindPeer(ctx, id)
372 373 374
		if err == routing.ErrNotFound {
			// this isn't an error. this is precisely what we expect.
		} else if err != nil {
375
			merr = append(merr, err)
376
		} else {
377 378 379 380 381
			// woah, actually found a peer with that ID? this shouldn't happen normally
			// (as the ID we use is not a real ID). this is an odd error worth logging.
			err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
			log.Errorf("%s", err)
			merr = append(merr, err)
382
		}
383
	}
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416

	sequential := true
	if sequential {
		// these should be parallel normally. but can make them sequential for debugging.
		// note that the core/bootstrap context deadline should be extended too for that.
		for i := 0; i < queries; i++ {
			id := randomID()
			log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
			runQuery(ctx, id)
		}

	} else {
		// note on parallelism here: the context is passed in to the queries, so they
		// **should** exit when it exceeds, making this function exit on ctx cancel.
		// normally, we should be selecting on ctx.Done() here too, but this gets
		// complicated to do with WaitGroup, and doesnt wait for the children to exit.
		var wg sync.WaitGroup
		for i := 0; i < queries; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()

				id := randomID()
				log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
				runQuery(ctx, id)
			}()
		}
		wg.Wait()
	}

	if len(merr) > 0 {
		return merr
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
417
	return nil
418
}