routing.go 11.5 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
4
	"math"
Jeromy's avatar
Jeromy committed
5
	"sync"
6

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
8

9
	inet "github.com/jbenet/go-ipfs/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	peer "github.com/jbenet/go-ipfs/peer"
11
	"github.com/jbenet/go-ipfs/routing"
12
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
13
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	u "github.com/jbenet/go-ipfs/util"
Jeromy's avatar
Jeromy committed
15
	pset "github.com/jbenet/go-ipfs/util/peerset"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

18 19 20 21 22 23
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25 26 27 28
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
29
// This is the top level "Store" operation of the DHT
30
func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
32 33 34 35
	err := dht.putLocal(key, value)
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36

37 38 39 40 41 42
	rec, err := dht.makePutRecord(key, value)
	if err != nil {
		log.Error("Creation of record failed!")
		return err
	}

43 44 45 46
	pchan, err := dht.getClosestPeers(ctx, key, KValue)
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47

48 49 50 51 52 53 54 55 56 57 58 59 60
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
			err := dht.putValueToNetwork(ctx, p, key, rec)
			if err != nil {
				log.Errorf("failed putting value to peer: %s", err)
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61 62 63
}

// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
64 65
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66
func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
	log.Debugf("Get Value [%s]", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68

Jeromy's avatar
Jeromy committed
69
	// If we have it local, dont bother doing an RPC!
70
	val, err := dht.getLocal(key)
Jeromy's avatar
Jeromy committed
71
	if err == nil {
72
		log.Debug("Got value locally!")
Jeromy's avatar
Jeromy committed
73 74 75
		return val, nil
	}

76 77
	// get closest peers in the routing table
	closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
78
	if closest == nil || len(closest) == 0 {
79
		log.Warning("Got no peers back from routing table!")
Jeromy's avatar
Jeromy committed
80
		return nil, kb.ErrLookupFailure
81 82
	}

83
	// setup the Query
84
	query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
85

86
		val, peers, err := dht.getValueOrPeers(ctx, p, key)
87 88 89
		if err != nil {
			return nil, err
		}
90

91 92 93 94 95 96 97
		res := &dhtQueryResult{value: val, closerPeers: peers}
		if val != nil {
			res.success = true
		}

		return res, nil
	})
98

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99
	// run it!
100
	result, err := query.Run(ctx, closest)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101 102
	if err != nil {
		return nil, err
103 104
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105
	log.Debugf("GetValue %v %v", key, result.value)
106
	if result.value == nil {
107
		return nil, routing.ErrNotFound
108
	}
109 110

	return result.value, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111 112 113 114 115
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

116
// Provide makes this node announce that it can provide a value for the given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118

119 120
	log.Event(ctx, "Provide Value start", &key)
	defer log.Event(ctx, "Provide Value end", &key)
121
	dht.providers.AddProvider(key, dht.self)
122 123 124 125

	peers, err := dht.getClosestPeers(ctx, key, KValue)
	if err != nil {
		return err
126 127
	}

128
	for p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130
		err := dht.putProvider(ctx, p, string(key))
		if err != nil {
131
			log.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132
		}
133 134
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135 136
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
137 138 139 140 141 142 143 144 145
// FindProviders searches until the context expires.
func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerInfo, error) {
	var providers []peer.PeerInfo
	for p := range dht.FindProvidersAsync(ctx, key, math.MaxInt32) {
		providers = append(providers, p)
	}
	return providers, nil
}

146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) (<-chan peer.ID, error) {
	log.Error("Get Closest Peers")
	tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
	if len(tablepeers) == 0 {
		return nil, kb.ErrLookupFailure
	}

	out := make(chan peer.ID, count)
	peerset := pset.NewLimited(count)

	for _, p := range tablepeers {
		out <- p
		peerset.Add(p)
	}

	wg := sync.WaitGroup{}
	for _, p := range tablepeers {
		wg.Add(1)
		go func(p peer.ID) {
			dht.getClosestPeersRecurse(ctx, key, p, peerset, out)
			wg.Done()
		}(p)
	}

	go func() {
		wg.Wait()
		close(out)
		log.Error("Closing closest peer chan")
	}()

	return out, nil
}

func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p peer.ID, peers *pset.PeerSet, peerOut chan<- peer.ID) {
	log.Error("closest peers recurse")
	defer log.Error("closest peers recurse end")
	closer, err := dht.closerPeersSingle(ctx, key, p)
	if err != nil {
		log.Errorf("error getting closer peers: %s", err)
		return
	}

	wg := sync.WaitGroup{}
	for _, p := range closer {
		if kb.Closer(p, dht.self, key) && peers.TryAdd(p) {
			select {
			case peerOut <- p:
			case <-ctx.Done():
				return
			}
			wg.Add(1)
			go func(p peer.ID) {
				dht.getClosestPeersRecurse(ctx, key, p, peers, peerOut)
				wg.Done()
			}(p)
		}
	}
	wg.Wait()
}

func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) {
	log.Errorf("closest peers single %s %s", p, key)
	defer log.Errorf("closest peers single end %s %s", p, key)
	pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
	if err != nil {
		return nil, err
	}

	var out []peer.ID
	for _, pbp := range pmes.GetCloserPeers() {
		pid := peer.ID(pbp.GetId())
		dht.peerstore.AddAddresses(pid, pbp.Addresses())
		err := dht.ensureConnectedToPeer(ctx, pid)
		if err != nil {
			return nil, err
		}
		out = append(out, pid)
	}
	return out, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227 228 229
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
230
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.PeerInfo {
231
	log.Event(ctx, "findProviders", &key)
232
	peerOut := make(chan peer.PeerInfo, count)
Jeromy's avatar
Jeromy committed
233 234 235 236
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

237
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) {
Jeromy's avatar
Jeromy committed
238
	defer close(peerOut)
239
	log.Debugf("%s FindProviders %s", dht.self, key)
Jeromy's avatar
Jeromy committed
240

Jeromy's avatar
Jeromy committed
241
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
242 243 244
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
		// NOTE: assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
245
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
246
			select {
247
			case peerOut <- dht.peerstore.PeerInfo(p):
Jeromy's avatar
Jeromy committed
248 249 250
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
251
		}
Jeromy's avatar
Jeromy committed
252 253 254

		// If we have enough peers locally, dont bother with remote RPC
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
255 256 257 258 259
			return
		}
	}

	// setup the Query
260
	query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
261

262
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
263 264 265 266
		if err != nil {
			return nil, err
		}

267
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Jeromy's avatar
Jeromy committed
268 269 270

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
271
			if ps.TryAdd(prov.ID) {
Jeromy's avatar
Jeromy committed
272 273 274 275 276 277
				select {
				case peerOut <- prov:
				case <-ctx.Done():
					log.Error("Context timed out sending more providers")
					return nil, ctx.Err()
				}
278
			}
Jeromy's avatar
Jeromy committed
279 280
			if ps.Size() >= count {
				return &dhtQueryResult{success: true}, nil
281 282 283
			}
		}

Jeromy's avatar
Jeromy committed
284 285
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
286
		clpeers := pb.PBPeersToPeerInfos(closer)
Jeromy's avatar
Jeromy committed
287 288 289
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

290
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
Jeromy's avatar
Jeromy committed
291 292 293 294
	_, err := query.Run(ctx, peers)
	if err != nil {
		log.Errorf("FindProviders Query error: %s", err)
	}
295 296
}

297
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.PeerInfo) {
298
	var wg sync.WaitGroup
299 300
	peerInfos := pb.PBPeersToPeerInfos(peers)
	for _, pi := range peerInfos {
301
		wg.Add(1)
302
		go func(pi peer.PeerInfo) {
303
			defer wg.Done()
304 305 306

			p := pi.ID
			if err := dht.ensureConnectedToPeer(ctx, p); err != nil {
Jeromy's avatar
Jeromy committed
307
				log.Errorf("%s", err)
Jeromy's avatar
Jeromy committed
308 309
				return
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
310

Jeromy's avatar
Jeromy committed
311
			dht.providers.AddProvider(k, p)
Jeromy's avatar
Jeromy committed
312
			if ps.TryAdd(p) {
313
				select {
314
				case out <- pi:
315 316 317
				case <-ctx.Done():
					return
				}
Jeromy's avatar
Jeromy committed
318 319 320
			} else if ps.Size() >= count {
				return
			}
321
		}(pi)
Jeromy's avatar
Jeromy committed
322
	}
323
	wg.Wait()
324 325
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
326
// FindPeer searches for a peer with given ID.
327
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
328

329
	// Check if were already connected to them
330 331
	if pi, _ := dht.FindLocal(id); pi.ID != "" {
		return pi, nil
332 333
	}

334
	closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
Jeromy's avatar
Jeromy committed
335
	if closest == nil || len(closest) == 0 {
336
		return peer.PeerInfo{}, kb.ErrLookupFailure
337
	}
338

Jeromy's avatar
Jeromy committed
339 340
	// Sanity...
	for _, p := range closest {
341
		if p == id {
Jeromy's avatar
Jeromy committed
342
			log.Error("Found target peer in list of closest peers...")
343
			return dht.peerstore.PeerInfo(p), nil
344
		}
345
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
346

Jeromy's avatar
Jeromy committed
347
	// setup the Query
348
	query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
349

350
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
351
		if err != nil {
352
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
353
		}
354

Jeromy's avatar
Jeromy committed
355
		closer := pmes.GetCloserPeers()
356
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
357

358
		// see it we got the peer here
359 360
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
361
				return &dhtQueryResult{
362
					peer:    npi,
Jeromy's avatar
Jeromy committed
363 364 365
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
366 367
		}

368
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
369
	})
370

Jeromy's avatar
Jeromy committed
371 372
	// run it!
	result, err := query.Run(ctx, closest)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
373
	if err != nil {
374
		return peer.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
375 376
	}

377
	log.Debugf("FindPeer %v %v", id, result.success)
378 379
	if result.peer.ID == "" {
		return peer.PeerInfo{}, routing.ErrNotFound
380
	}
Jeromy's avatar
Jeromy committed
381

382
	return result.peer, nil
383 384
}

385
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
386
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan peer.PeerInfo, error) {
387

388 389
	peerchan := make(chan peer.PeerInfo, asyncQueryBuffer)
	peersSeen := peer.Set{}
390

391
	closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
392 393 394 395 396
	if closest == nil || len(closest) == 0 {
		return nil, kb.ErrLookupFailure
	}

	// setup the Query
397
	query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
398

399
		pmes, err := dht.findPeerSingle(ctx, p, id)
400 401 402 403
		if err != nil {
			return nil, err
		}

404
		var clpeers []peer.PeerInfo
405 406
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
407
			pi := pb.PBPeerToPeerInfo(pbp)
408

409 410
			// skip peers already seen
			if _, found := peersSeen[pi.ID]; found {
411 412
				continue
			}
413
			peersSeen[pi.ID] = struct{}{}
414 415 416 417 418 419

			// if peer is connected, send it to our client.
			if pb.Connectedness(*pbp.Connection) == inet.Connected {
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
420
				case peerchan <- pi:
421 422 423 424
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
425
			// TODO maybe query it?
426
			if pb.Connectedness(*pbp.Connection) != inet.Connected {
427
				clpeers = append(clpeers, pi)
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
		if _, err := query.Run(ctx, closest); err != nil {
			log.Error(err)
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}

448
// Ping a peer, log the time it took
449
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
450
	// Thoughts: maybe this should accept an ID and do a peer lookup?
Brian Tiger Chow's avatar
Brian Tiger Chow committed
451
	log.Debugf("ping %s start", p)
452

453
	pmes := pb.NewMessage(pb.Message_PING, "", 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
454
	_, err := dht.sendRequest(ctx, p, pmes)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
455
	log.Debugf("ping %s end (err = %s)", p, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
456
	return err
457
}