routing.go 9.85 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"sync"
5

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
7

8
	inet "github.com/jbenet/go-ipfs/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	peer "github.com/jbenet/go-ipfs/peer"
10
	"github.com/jbenet/go-ipfs/routing"
11
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
12
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	u "github.com/jbenet/go-ipfs/util"
Jeromy's avatar
Jeromy committed
14
	pset "github.com/jbenet/go-ipfs/util/peerset"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

17 18 19 20 21 22
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24 25 26 27
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
28
// This is the top level "Store" operation of the DHT
29
func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
31 32 33 34
	err := dht.putLocal(key, value)
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35

36 37 38 39 40 41
	rec, err := dht.makePutRecord(key, value)
	if err != nil {
		log.Error("Creation of record failed!")
		return err
	}

42
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43

44
	query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
		log.Debugf("%s PutValue qry part %v", dht.self, p)
46
		err := dht.putValueToNetwork(ctx, p, string(key), rec)
47 48 49 50 51
		if err != nil {
			return nil, err
		}
		return &dhtQueryResult{success: true}, nil
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52

53
	_, err = query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54
	return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56 57
}

// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
58 59
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60
func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61
	log.Debugf("Get Value [%s]", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62

Jeromy's avatar
Jeromy committed
63 64
	// If we have it local, dont bother doing an RPC!
	// NOTE: this might not be what we want to do...
65
	val, err := dht.getLocal(key)
Jeromy's avatar
Jeromy committed
66
	if err == nil {
67
		log.Debug("Got value locally!")
Jeromy's avatar
Jeromy committed
68 69 70
		return val, nil
	}

71 72
	// get closest peers in the routing table
	closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
73
	if closest == nil || len(closest) == 0 {
74
		log.Warning("Got no peers back from routing table!")
Jeromy's avatar
Jeromy committed
75
		return nil, kb.ErrLookupFailure
76 77
	}

78
	// setup the Query
79
	query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
80

81
		val, peers, err := dht.getValueOrPeers(ctx, p, key)
82 83 84
		if err != nil {
			return nil, err
		}
85

86 87 88 89 90 91 92
		res := &dhtQueryResult{value: val, closerPeers: peers}
		if val != nil {
			res.success = true
		}

		return res, nil
	})
93

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94
	// run it!
95
	result, err := query.Run(ctx, closest)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
96 97
	if err != nil {
		return nil, err
98 99
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100
	log.Debugf("GetValue %v %v", key, result.value)
101
	if result.value == nil {
102
		return nil, routing.ErrNotFound
103
	}
104 105

	return result.value, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106 107 108 109 110
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

111
// Provide makes this node announce that it can provide a value for the given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113

114
	dht.providers.AddProvider(key, dht.self)
115
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
116
	if len(peers) == 0 {
117
		return nil
118 119
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121
	//TODO FIX: this doesn't work! it needs to be sent to the actual nearest peers.
	// `peers` are the closest peers we have, not the ones that should get the value.
122
	for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123 124 125 126
		err := dht.putProvider(ctx, p, string(key))
		if err != nil {
			return err
		}
127 128
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131 132 133
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
134
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer {
135
	log.Event(ctx, "findProviders", &key)
136
	peerOut := make(chan peer.Peer, count)
Jeromy's avatar
Jeromy committed
137 138 139 140 141 142 143
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) {
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
144
	ps := pset.NewPeerSet()
Jeromy's avatar
Jeromy committed
145 146 147
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
		// NOTE: assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
148 149 150 151 152 153
		if ps.AddIfSmallerThan(p, count) {
			select {
			case peerOut <- p:
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
154
		}
Jeromy's avatar
Jeromy committed
155 156 157

		// If we have enough peers locally, dont bother with remote RPC
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
158 159 160 161 162
			return
		}
	}

	// setup the Query
163
	query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
164

165
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
166 167 168 169 170 171 172 173 174 175 176 177 178
		if err != nil {
			return nil, err
		}

		provs, errs := pb.PBPeersToPeers(dht.peerstore, pmes.GetProviderPeers())
		for _, err := range errs {
			if err != nil {
				log.Warning(err)
			}
		}

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
Jeromy's avatar
Jeromy committed
179 180 181 182 183 184 185
			if ps.AddIfSmallerThan(prov, count) {
				select {
				case peerOut <- prov:
				case <-ctx.Done():
					log.Error("Context timed out sending more providers")
					return nil, ctx.Err()
				}
186
			}
Jeromy's avatar
Jeromy committed
187 188
			if ps.Size() >= count {
				return &dhtQueryResult{success: true}, nil
189 190 191
			}
		}

Jeromy's avatar
Jeromy committed
192 193 194 195 196 197 198
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
		clpeers, errs := pb.PBPeersToPeers(dht.peerstore, closer)
		for _, err := range errs {
			if err != nil {
				log.Warning(err)
			}
199
		}
Jeromy's avatar
Jeromy committed
200 201 202 203

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

204
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
Jeromy's avatar
Jeromy committed
205 206 207 208
	_, err := query.Run(ctx, peers)
	if err != nil {
		log.Errorf("FindProviders Query error: %s", err)
	}
209 210
}

Jeromy's avatar
Jeromy committed
211
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.Peer) {
212
	var wg sync.WaitGroup
213
	for _, pbp := range peers {
214
		wg.Add(1)
215
		go func(mp *pb.Message_Peer) {
216
			defer wg.Done()
Jeromy's avatar
Jeromy committed
217
			// construct new peer
218
			p, err := dht.ensureConnectedToPeer(ctx, mp)
Jeromy's avatar
Jeromy committed
219
			if err != nil {
Jeromy's avatar
Jeromy committed
220
				log.Errorf("%s", err)
Jeromy's avatar
Jeromy committed
221 222 223 224 225 226
				return
			}
			if p == nil {
				log.Error("Got nil peer from ensureConnectedToPeer")
				return
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227

Jeromy's avatar
Jeromy committed
228 229
			dht.providers.AddProvider(k, p)
			if ps.AddIfSmallerThan(p, count) {
230 231 232 233 234
				select {
				case out <- p:
				case <-ctx.Done():
					return
				}
Jeromy's avatar
Jeromy committed
235 236 237 238 239
			} else if ps.Size() >= count {
				return
			}
		}(pbp)
	}
240
	wg.Wait()
241 242
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
243
// FindPeer searches for a peer with given ID.
244
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
245

246
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
247
	p, _ := dht.FindLocal(id)
248 249 250 251
	if p != nil {
		return p, nil
	}

252
	closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
Jeromy's avatar
Jeromy committed
253 254
	if closest == nil || len(closest) == 0 {
		return nil, kb.ErrLookupFailure
255
	}
256

Jeromy's avatar
Jeromy committed
257 258 259 260 261
	// Sanity...
	for _, p := range closest {
		if p.ID().Equal(id) {
			log.Error("Found target peer in list of closest peers...")
			return p, nil
262
		}
263
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264

Jeromy's avatar
Jeromy committed
265
	// setup the Query
266
	query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
267

268
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
269
		if err != nil {
270
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
271
		}
272

Jeromy's avatar
Jeromy committed
273
		closer := pmes.GetCloserPeers()
274 275
		clpeers, errs := pb.PBPeersToPeers(dht.peerstore, closer)
		for _, err := range errs {
276
			if err != nil {
277
				log.Warning(err)
278
			}
279
		}
280

281 282 283
		// see it we got the peer here
		for _, np := range clpeers {
			if string(np.ID()) == string(id) {
Jeromy's avatar
Jeromy committed
284 285 286 287 288
				return &dhtQueryResult{
					peer:    np,
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
289 290
		}

Jeromy's avatar
Jeromy committed
291
		return &dhtQueryResult{closerPeers: clpeers}, nil
292
	})
293

Jeromy's avatar
Jeromy committed
294 295
	// run it!
	result, err := query.Run(ctx, closest)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296 297 298 299
	if err != nil {
		return nil, err
	}

300
	log.Debugf("FindPeer %v %v", id, result.success)
301
	if result.peer == nil {
302
		return nil, routing.ErrNotFound
303
	}
Jeromy's avatar
Jeromy committed
304

305
	return result.peer, nil
306 307
}

308 309 310
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan peer.Peer, error) {

311
	peerchan := make(chan peer.Peer, asyncQueryBuffer)
312 313
	peersSeen := map[string]peer.Peer{}

314
	closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
315 316 317 318 319
	if closest == nil || len(closest) == 0 {
		return nil, kb.ErrLookupFailure
	}

	// setup the Query
320
	query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
321

322
		pmes, err := dht.findPeerSingle(ctx, p, id)
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
		if err != nil {
			return nil, err
		}

		var clpeers []peer.Peer
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
			// skip peers already seen
			if _, found := peersSeen[string(pbp.GetId())]; found {
				continue
			}

			// skip peers that fail to unmarshal
			p, err := pb.PBPeerToPeer(dht.peerstore, pbp)
			if err != nil {
				log.Warning(err)
				continue
			}

			// if peer is connected, send it to our client.
			if pb.Connectedness(*pbp.Connection) == inet.Connected {
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
				case peerchan <- p:
				}
			}

			peersSeen[string(p.ID())] = p

			// if peer is the peer we're looking for, don't bother querying it.
			if pb.Connectedness(*pbp.Connection) != inet.Connected {
				clpeers = append(clpeers, p)
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
		if _, err := query.Run(ctx, closest); err != nil {
			log.Error(err)
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}

376
// Ping a peer, log the time it took
377
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error {
378
	// Thoughts: maybe this should accept an ID and do a peer lookup?
Brian Tiger Chow's avatar
Brian Tiger Chow committed
379
	log.Debugf("ping %s start", p)
380

381
	pmes := pb.NewMessage(pb.Message_PING, "", 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
382
	_, err := dht.sendRequest(ctx, p, pmes)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
383
	log.Debugf("ping %s end (err = %s)", p, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
384
	return err
385
}