routing.go 12.4 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"sync"
Jeromy's avatar
Jeromy committed
6
	"time"
7

8
	key "github.com/ipfs/go-ipfs/blocks/key"
9 10 11 12 13
	notif "github.com/ipfs/go-ipfs/notifications"
	"github.com/ipfs/go-ipfs/routing"
	pb "github.com/ipfs/go-ipfs/routing/dht/pb"
	kb "github.com/ipfs/go-ipfs/routing/kbucket"
	record "github.com/ipfs/go-ipfs/routing/record"
14
	pset "github.com/ipfs/go-ipfs/thirdparty/peerset"
Jeromy's avatar
Jeromy committed
15
	inet "gx/ipfs/QmXDvxcXUYn2DDnGKJwdQPxkJgG83jBTp5UmmNzeHzqbj5/go-libp2p/p2p/net"
16
	peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
17
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

20 21 22 23 24 25
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27 28 29 30
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
31
// This is the top level "Store" operation of the DHT
32
func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
34
	sk, err := dht.getOwnPrivateKey()
Jeromy's avatar
Jeromy committed
35 36 37
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38

39 40 41 42 43
	sign, err := dht.Validator.IsSigned(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
44
	rec, err := record.MakePutRecord(sk, key, value, sign)
Jeromy's avatar
Jeromy committed
45
	if err != nil {
Jeromy's avatar
Jeromy committed
46
		log.Debug("Creation of record failed!")
Jeromy's avatar
Jeromy committed
47 48 49
		return err
	}

Jeromy's avatar
Jeromy committed
50
	err = dht.putLocal(key, rec)
51 52 53 54
	if err != nil {
		return err
	}

55
	pchan, err := dht.GetClosestPeers(ctx, key)
56 57 58
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59

60 61 62 63
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
64 65
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
66
			defer wg.Done()
Jeromy's avatar
Jeromy committed
67 68 69 70 71
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

Jeromy's avatar
Jeromy committed
72
			err := dht.putValueToPeer(ctx, p, key, rec)
73
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74
				log.Debugf("failed putting value to peer: %s", err)
75 76 77 78 79
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80 81 82
}

// GetValue searches for the value corresponding to given Key.
83
func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
Jeromy's avatar
Jeromy committed
84 85 86 87
	ctx, cancel := context.WithTimeout(ctx, time.Minute)
	defer cancel()

	vals, err := dht.GetValues(ctx, key, 16)
88 89 90 91 92 93
	if err != nil {
		return nil, err
	}

	var recs [][]byte
	for _, v := range vals {
94 95 96
		if v.Val != nil {
			recs = append(recs, v.Val)
		}
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
	}

	i, err := dht.Selector.BestRecord(key, recs)
	if err != nil {
		return nil, err
	}

	best := recs[i]
	log.Debugf("GetValue %v %v", key, best)
	if best == nil {
		log.Errorf("GetValue yielded correct record with nil value.")
		return nil, routing.ErrNotFound
	}

	fixupRec, err := record.MakePutRecord(dht.peerstore.PrivKey(dht.self), key, best, true)
	if err != nil {
		// probably shouldnt actually 'error' here as we have found a value we like,
		// but this call failing probably isnt something we want to ignore
		return nil, err
	}

	for _, v := range vals {
		// if someone sent us a different 'less-valid' record, lets correct them
		if !bytes.Equal(v.Val, best) {
			go func(v routing.RecvdVal) {
Jeromy's avatar
Jeromy committed
122 123
				ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
				defer cancel()
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
				err := dht.putValueToPeer(ctx, v.From, key, fixupRec)
				if err != nil {
					log.Error("Error correcting DHT entry: ", err)
				}
			}(v)
		}
	}

	return best, nil
}

func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]routing.RecvdVal, error) {
	var vals []routing.RecvdVal
	var valslock sync.Mutex

Jeromy's avatar
Jeromy committed
139
	// If we have it local, dont bother doing an RPC!
140
	lrec, err := dht.getLocal(key)
Jeromy's avatar
Jeromy committed
141
	if err == nil {
142 143
		// TODO: this is tricky, we dont always want to trust our own value
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144
		log.Debug("have it locally")
145 146 147 148 149 150 151 152 153 154
		vals = append(vals, routing.RecvdVal{
			Val:  lrec.GetValue(),
			From: dht.self,
		})

		if nvals <= 1 {
			return vals, nil
		}
	} else if nvals == 0 {
		return nil, err
Jeromy's avatar
Jeromy committed
155 156
	}

157
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
158
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
Jeromy's avatar
Jeromy committed
159
	log.Debugf("peers in rt: %s", len(rtp), rtp)
160
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161
		log.Warning("No peers from routing table!")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162
		return nil, kb.ErrLookupFailure
163 164
	}

165
	// setup the Query
Jeromy's avatar
Jeromy committed
166
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
168
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
169 170 171 172
			Type: notif.SendingQuery,
			ID:   p,
		})

173
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
174 175 176 177 178 179 180 181 182
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
183
			return nil, err
184 185 186 187 188
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
189
		}
190

191 192
		res := &dhtQueryResult{closerPeers: peers}

193
		if rec.GetValue() != nil || err == errInvalidRecord {
194 195 196 197 198 199 200 201 202 203 204 205
			rv := routing.RecvdVal{
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
			vals = append(vals, rv)

			// If weve collected enough records, we're done
			if len(vals) >= nvals {
				res.success = true
			}
			valslock.Unlock()
206 207
		}

Jeromy's avatar
Jeromy committed
208
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
209 210 211 212 213
			Type:      notif.PeerResponse,
			ID:        p,
			Responses: pointerizePeerInfos(peers),
		})

214 215
		return res, nil
	})
216

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
217
	// run it!
218 219 220 221 222
	_, err = query.Run(ctx, rtp)
	if len(vals) == 0 {
		if err != nil {
			return nil, err
		}
223 224
	}

225
	return vals, nil
226

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227 228 229 230 231
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

232
// Provide makes this node announce that it can provide a value for the given key
233
func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
Jeromy's avatar
Jeromy committed
234
	defer log.EventBegin(ctx, "provide", &key).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235 236

	// add self locally
237
	dht.providers.AddProvider(ctx, key, dht.self)
238

239
	peers, err := dht.GetClosestPeers(ctx, key)
240 241
	if err != nil {
		return err
242 243
	}

Jeromy's avatar
Jeromy committed
244
	wg := sync.WaitGroup{}
245
	for p := range peers {
Jeromy's avatar
Jeromy committed
246 247 248
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
249
			log.Debugf("putProvider(%s, %s)", key, p)
Jeromy's avatar
Jeromy committed
250 251
			err := dht.putProvider(ctx, p, string(key))
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
				log.Debug(err)
Jeromy's avatar
Jeromy committed
253 254
			}
		}(p)
255
	}
Jeromy's avatar
Jeromy committed
256
	wg.Wait()
257
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
258 259
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
260
// FindProviders searches until the context expires.
261
func (dht *IpfsDHT) FindProviders(ctx context.Context, key key.Key) ([]peer.PeerInfo, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
262
	var providers []peer.PeerInfo
Jeromy's avatar
Jeromy committed
263
	for p := range dht.FindProvidersAsync(ctx, key, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
264 265 266 267 268
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
269 270 271
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
272
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key key.Key, count int) <-chan peer.PeerInfo {
273
	log.Event(ctx, "findProviders", &key)
274
	peerOut := make(chan peer.PeerInfo, count)
Jeromy's avatar
Jeromy committed
275 276 277 278
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

279
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key key.Key, count int, peerOut chan peer.PeerInfo) {
Jeromy's avatar
Jeromy committed
280
	defer log.EventBegin(ctx, "findProvidersAsync", &key).Done()
Jeromy's avatar
Jeromy committed
281 282
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
283
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
284 285
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
286
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
287
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
288
			select {
289
			case peerOut <- dht.peerstore.PeerInfo(p):
Jeromy's avatar
Jeromy committed
290 291 292
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
293
		}
Jeromy's avatar
Jeromy committed
294 295 296

		// If we have enough peers locally, dont bother with remote RPC
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
297 298 299 300 301
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
302
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
303
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
304
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
305 306 307
			Type: notif.SendingQuery,
			ID:   p,
		})
308
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
309 310 311 312
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
313
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
314
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
315
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
316 317 318

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
319
			log.Debugf("got provider: %s", prov)
320
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
321
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
322 323 324
				select {
				case peerOut <- prov:
				case <-ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
325
					log.Debug("Context timed out sending more providers")
Jeromy's avatar
Jeromy committed
326 327
					return nil, ctx.Err()
				}
328
			}
Jeromy's avatar
Jeromy committed
329
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
330
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
331
				return &dhtQueryResult{success: true}, nil
332 333 334
			}
		}

Jeromy's avatar
Jeromy committed
335 336
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
337
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
338
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
339

Jeromy's avatar
Jeromy committed
340
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
341 342 343 344
			Type:      notif.PeerResponse,
			ID:        p,
			Responses: pointerizePeerInfos(clpeers),
		})
Jeromy's avatar
Jeromy committed
345 346 347
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
348
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
Jeromy's avatar
Jeromy committed
349 350
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
351
		log.Debugf("Query error: %s", err)
352 353 354 355
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
356
	}
357 358
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
359
// FindPeer searches for a peer with given ID.
360
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
361
	defer log.EventBegin(ctx, "FindPeer", id).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
362

363
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
364
	if pi := dht.FindLocal(id); pi.ID != "" {
365
		return pi, nil
366 367
	}

Jeromy's avatar
Jeromy committed
368
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue)
369
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
370
		return peer.PeerInfo{}, kb.ErrLookupFailure
371
	}
372

Jeromy's avatar
Jeromy committed
373
	// Sanity...
374
	for _, p := range peers {
375
		if p == id {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
376
			log.Debug("Found target peer in list of closest peers...")
377
			return dht.peerstore.PeerInfo(p), nil
378
		}
379
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
380

Jeromy's avatar
Jeromy committed
381
	// setup the Query
Jeromy's avatar
Jeromy committed
382
	parent := ctx
383
	query := dht.newQuery(key.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
384
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
385 386 387
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
388

389
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
390
		if err != nil {
391
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
392
		}
393

Jeromy's avatar
Jeromy committed
394
		closer := pmes.GetCloserPeers()
395
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
396

397
		// see it we got the peer here
398 399
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
400
				return &dhtQueryResult{
401
					peer:    npi,
Jeromy's avatar
Jeromy committed
402 403 404
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
405 406
		}

Jeromy's avatar
Jeromy committed
407
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
408 409 410 411
			Type:      notif.PeerResponse,
			Responses: pointerizePeerInfos(clpeerInfos),
		})

412
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
413
	})
414

Jeromy's avatar
Jeromy committed
415
	// run it!
416
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
417
	if err != nil {
418
		return peer.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
419 420
	}

421
	log.Debugf("FindPeer %v %v", id, result.success)
422 423
	if result.peer.ID == "" {
		return peer.PeerInfo{}, routing.ErrNotFound
424
	}
Jeromy's avatar
Jeromy committed
425

426
	return result.peer, nil
427 428
}

429
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
430
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan peer.PeerInfo, error) {
431

432 433
	peerchan := make(chan peer.PeerInfo, asyncQueryBuffer)
	peersSeen := peer.Set{}
434

Jeromy's avatar
Jeromy committed
435
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue)
436
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437
		return nil, kb.ErrLookupFailure
438 439 440
	}

	// setup the Query
441
	query := dht.newQuery(key.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
442

443
		pmes, err := dht.findPeerSingle(ctx, p, id)
444 445 446 447
		if err != nil {
			return nil, err
		}

448
		var clpeers []peer.PeerInfo
449 450
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
451
			pi := pb.PBPeerToPeerInfo(pbp)
452

453 454
			// skip peers already seen
			if _, found := peersSeen[pi.ID]; found {
455 456
				continue
			}
457
			peersSeen[pi.ID] = struct{}{}
458 459 460 461 462 463

			// if peer is connected, send it to our client.
			if pb.Connectedness(*pbp.Connection) == inet.Connected {
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
464
				case peerchan <- pi:
465 466 467 468
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
469
			// TODO maybe query it?
470
			if pb.Connectedness(*pbp.Connection) != inet.Connected {
471
				clpeers = append(clpeers, pi)
472 473 474 475 476 477 478 479 480
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
481
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
482
			log.Debug(err)
483 484 485 486 487 488 489 490
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}