routing.go 11.7 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"sync"
6

7
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
8
	key "github.com/ipfs/go-ipfs/blocks/key"
9 10 11 12 13 14 15 16
	notif "github.com/ipfs/go-ipfs/notifications"
	inet "github.com/ipfs/go-ipfs/p2p/net"
	peer "github.com/ipfs/go-ipfs/p2p/peer"
	"github.com/ipfs/go-ipfs/routing"
	pb "github.com/ipfs/go-ipfs/routing/dht/pb"
	kb "github.com/ipfs/go-ipfs/routing/kbucket"
	record "github.com/ipfs/go-ipfs/routing/record"
	pset "github.com/ipfs/go-ipfs/util/peerset"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
)

19 20 21 22 23 24
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25 26 27 28 29
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
30
// This is the top level "Store" operation of the DHT
31
func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
33
	sk, err := dht.getOwnPrivateKey()
Jeromy's avatar
Jeromy committed
34 35 36
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37

38 39 40 41 42
	sign, err := dht.Validator.IsSigned(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
43
	rec, err := record.MakePutRecord(sk, key, value, sign)
Jeromy's avatar
Jeromy committed
44
	if err != nil {
Jeromy's avatar
Jeromy committed
45
		log.Debug("Creation of record failed!")
Jeromy's avatar
Jeromy committed
46 47 48
		return err
	}

Jeromy's avatar
Jeromy committed
49
	err = dht.putLocal(key, rec)
50 51 52 53
	if err != nil {
		return err
	}

54
	pchan, err := dht.GetClosestPeers(ctx, key)
55 56 57
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58

59 60 61 62 63
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Jeromy's avatar
Jeromy committed
64 65 66 67 68
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

Jeromy's avatar
Jeromy committed
69
			err := dht.putValueToPeer(ctx, p, key, rec)
70
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
				log.Debugf("failed putting value to peer: %s", err)
72 73 74 75 76
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77 78 79
}

// GetValue searches for the value corresponding to given Key.
80
func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
	vals, err := dht.GetValues(ctx, key, 3)
	if err != nil {
		return nil, err
	}

	var recs [][]byte
	for _, v := range vals {
		recs = append(recs, v.Val)
	}

	i, err := dht.Selector.BestRecord(key, recs)
	if err != nil {
		return nil, err
	}

	best := recs[i]
	log.Debugf("GetValue %v %v", key, best)
	if best == nil {
		log.Errorf("GetValue yielded correct record with nil value.")
		return nil, routing.ErrNotFound
	}

	fixupRec, err := record.MakePutRecord(dht.peerstore.PrivKey(dht.self), key, best, true)
	if err != nil {
		// probably shouldnt actually 'error' here as we have found a value we like,
		// but this call failing probably isnt something we want to ignore
		return nil, err
	}

	for _, v := range vals {
		// if someone sent us a different 'less-valid' record, lets correct them
		if !bytes.Equal(v.Val, best) {
			go func(v routing.RecvdVal) {
				err := dht.putValueToPeer(ctx, v.From, key, fixupRec)
				if err != nil {
					log.Error("Error correcting DHT entry: ", err)
				}
			}(v)
		}
	}

	return best, nil
}

func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]routing.RecvdVal, error) {
	var vals []routing.RecvdVal
	var valslock sync.Mutex

Jeromy's avatar
Jeromy committed
129
	// If we have it local, dont bother doing an RPC!
130
	lrec, err := dht.getLocal(key)
Jeromy's avatar
Jeromy committed
131
	if err == nil {
132 133
		// TODO: this is tricky, we dont always want to trust our own value
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134
		log.Debug("have it locally")
135 136 137 138 139 140 141 142 143 144
		vals = append(vals, routing.RecvdVal{
			Val:  lrec.GetValue(),
			From: dht.self,
		})

		if nvals <= 1 {
			return vals, nil
		}
	} else if nvals == 0 {
		return nil, err
Jeromy's avatar
Jeromy committed
145 146
	}

147
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
148
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
Jeromy's avatar
Jeromy committed
149
	log.Debugf("peers in rt: %s", len(rtp), rtp)
150
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151
		log.Warning("No peers from routing table!")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152
		return nil, kb.ErrLookupFailure
153 154
	}

155
	// setup the Query
Jeromy's avatar
Jeromy committed
156
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
158
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
159 160 161 162
			Type: notif.SendingQuery,
			ID:   p,
		})

163
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
164 165 166
		if err != nil {
			return nil, err
		}
167

168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
		res := &dhtQueryResult{closerPeers: peers}

		if rec.GetValue() != nil {
			rv := routing.RecvdVal{
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
			vals = append(vals, rv)

			// If weve collected enough records, we're done
			if len(vals) >= nvals {
				res.success = true
			}
			valslock.Unlock()
183 184
		}

Jeromy's avatar
Jeromy committed
185
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
186 187 188 189 190
			Type:      notif.PeerResponse,
			ID:        p,
			Responses: pointerizePeerInfos(peers),
		})

191 192
		return res, nil
	})
193

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
194
	// run it!
195 196 197 198 199
	_, err = query.Run(ctx, rtp)
	if len(vals) == 0 {
		if err != nil {
			return nil, err
		}
200 201
	}

202
	return vals, nil
203

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204 205 206 207 208
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

209
// Provide makes this node announce that it can provide a value for the given key
210
func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
Jeromy's avatar
Jeromy committed
211
	defer log.EventBegin(ctx, "provide", &key).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212 213

	// add self locally
214
	dht.providers.AddProvider(ctx, key, dht.self)
215

216
	peers, err := dht.GetClosestPeers(ctx, key)
217 218
	if err != nil {
		return err
219 220
	}

Jeromy's avatar
Jeromy committed
221
	wg := sync.WaitGroup{}
222
	for p := range peers {
Jeromy's avatar
Jeromy committed
223 224 225
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226
			log.Debugf("putProvider(%s, %s)", key, p)
Jeromy's avatar
Jeromy committed
227 228
			err := dht.putProvider(ctx, p, string(key))
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229
				log.Debug(err)
Jeromy's avatar
Jeromy committed
230 231
			}
		}(p)
232
	}
Jeromy's avatar
Jeromy committed
233
	wg.Wait()
234
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235 236
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
237
// FindProviders searches until the context expires.
238
func (dht *IpfsDHT) FindProviders(ctx context.Context, key key.Key) ([]peer.PeerInfo, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
239
	var providers []peer.PeerInfo
Jeromy's avatar
Jeromy committed
240
	for p := range dht.FindProvidersAsync(ctx, key, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
241 242 243 244 245
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
246 247 248
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
249
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key key.Key, count int) <-chan peer.PeerInfo {
250
	log.Event(ctx, "findProviders", &key)
251
	peerOut := make(chan peer.PeerInfo, count)
Jeromy's avatar
Jeromy committed
252 253 254 255
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

256
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key key.Key, count int, peerOut chan peer.PeerInfo) {
Jeromy's avatar
Jeromy committed
257
	defer log.EventBegin(ctx, "findProvidersAsync", &key).Done()
Jeromy's avatar
Jeromy committed
258 259
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
260
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
261 262 263
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
		// NOTE: assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
264
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
265
			select {
266
			case peerOut <- dht.peerstore.PeerInfo(p):
Jeromy's avatar
Jeromy committed
267 268 269
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
270
		}
Jeromy's avatar
Jeromy committed
271 272 273

		// If we have enough peers locally, dont bother with remote RPC
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
274 275 276 277 278
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
279
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
280
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
281
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
282 283 284
			Type: notif.SendingQuery,
			ID:   p,
		})
285
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
286 287 288 289
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
290
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
291
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
292
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
293 294 295

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296
			log.Debugf("got provider: %s", prov)
297
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
298
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
299 300 301
				select {
				case peerOut <- prov:
				case <-ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
302
					log.Debug("Context timed out sending more providers")
Jeromy's avatar
Jeromy committed
303 304
					return nil, ctx.Err()
				}
305
			}
Jeromy's avatar
Jeromy committed
306
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
307
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
308
				return &dhtQueryResult{success: true}, nil
309 310 311
			}
		}

Jeromy's avatar
Jeromy committed
312 313
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
314
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
315
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
316

Jeromy's avatar
Jeromy committed
317
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
318 319 320 321
			Type:      notif.PeerResponse,
			ID:        p,
			Responses: pointerizePeerInfos(clpeers),
		})
Jeromy's avatar
Jeromy committed
322 323 324
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
325
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
Jeromy's avatar
Jeromy committed
326 327
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
328
		log.Debugf("Query error: %s", err)
329 330 331 332
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
333
	}
334 335
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
336
// FindPeer searches for a peer with given ID.
337
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
338
	defer log.EventBegin(ctx, "FindPeer", id).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
339

340
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
341
	if pi := dht.FindLocal(id); pi.ID != "" {
342
		return pi, nil
343 344
	}

Jeromy's avatar
Jeromy committed
345
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue)
346
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
347
		return peer.PeerInfo{}, kb.ErrLookupFailure
348
	}
349

Jeromy's avatar
Jeromy committed
350
	// Sanity...
351
	for _, p := range peers {
352
		if p == id {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
353
			log.Debug("Found target peer in list of closest peers...")
354
			return dht.peerstore.PeerInfo(p), nil
355
		}
356
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
357

Jeromy's avatar
Jeromy committed
358
	// setup the Query
Jeromy's avatar
Jeromy committed
359
	parent := ctx
360
	query := dht.newQuery(key.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
361
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
362 363 364
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
365

366
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
367
		if err != nil {
368
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
369
		}
370

Jeromy's avatar
Jeromy committed
371
		closer := pmes.GetCloserPeers()
372
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
373

374
		// see it we got the peer here
375 376
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
377
				return &dhtQueryResult{
378
					peer:    npi,
Jeromy's avatar
Jeromy committed
379 380 381
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
382 383
		}

Jeromy's avatar
Jeromy committed
384
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
385 386 387 388
			Type:      notif.PeerResponse,
			Responses: pointerizePeerInfos(clpeerInfos),
		})

389
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
390
	})
391

Jeromy's avatar
Jeromy committed
392
	// run it!
393
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
394
	if err != nil {
395
		return peer.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
396 397
	}

398
	log.Debugf("FindPeer %v %v", id, result.success)
399 400
	if result.peer.ID == "" {
		return peer.PeerInfo{}, routing.ErrNotFound
401
	}
Jeromy's avatar
Jeromy committed
402

403
	return result.peer, nil
404 405
}

406
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
407
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan peer.PeerInfo, error) {
408

409 410
	peerchan := make(chan peer.PeerInfo, asyncQueryBuffer)
	peersSeen := peer.Set{}
411

Jeromy's avatar
Jeromy committed
412
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue)
413
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
414
		return nil, kb.ErrLookupFailure
415 416 417
	}

	// setup the Query
418
	query := dht.newQuery(key.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
419

420
		pmes, err := dht.findPeerSingle(ctx, p, id)
421 422 423 424
		if err != nil {
			return nil, err
		}

425
		var clpeers []peer.PeerInfo
426 427
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
428
			pi := pb.PBPeerToPeerInfo(pbp)
429

430 431
			// skip peers already seen
			if _, found := peersSeen[pi.ID]; found {
432 433
				continue
			}
434
			peersSeen[pi.ID] = struct{}{}
435 436 437 438 439 440

			// if peer is connected, send it to our client.
			if pb.Connectedness(*pbp.Connection) == inet.Connected {
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
441
				case peerchan <- pi:
442 443 444 445
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
446
			// TODO maybe query it?
447
			if pb.Connectedness(*pbp.Connection) != inet.Connected {
448
				clpeers = append(clpeers, pi)
449 450 451 452 453 454 455 456 457
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
458
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
459
			log.Debug(err)
460 461 462 463 464 465 466 467
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}