routing.go 8.75 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"sync"
5

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
7

8
	inet "github.com/jbenet/go-ipfs/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	peer "github.com/jbenet/go-ipfs/peer"
10
	"github.com/jbenet/go-ipfs/routing"
11
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
12
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14 15 16 17 18 19 20
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
21
// This is the top level "Store" operation of the DHT
22
func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
24 25 26 27
	err := dht.putLocal(key, value)
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28

29 30 31 32 33 34
	rec, err := dht.makePutRecord(key, value)
	if err != nil {
		log.Error("Creation of record failed!")
		return err
	}

35
	var peers []peer.Peer
36
	for _, route := range dht.routingTables {
37 38
		npeers := route.NearestPeers(kb.ConvertKey(key), KValue)
		peers = append(peers, npeers...)
Jeromy's avatar
Jeromy committed
39
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41
	query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42
		log.Debugf("%s PutValue qry part %v", dht.self, p)
43
		err := dht.putValueToNetwork(ctx, p, string(key), rec)
44 45 46 47 48
		if err != nil {
			return nil, err
		}
		return &dhtQueryResult{success: true}, nil
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49

50
	_, err = query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51
	return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53 54
}

// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
55 56
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58
	log.Debugf("Get Value [%s]", key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59

Jeromy's avatar
Jeromy committed
60 61
	// If we have it local, dont bother doing an RPC!
	// NOTE: this might not be what we want to do...
62
	val, err := dht.getLocal(key)
Jeromy's avatar
Jeromy committed
63
	if err == nil {
64
		log.Debug("Got value locally!")
Jeromy's avatar
Jeromy committed
65 66 67
		return val, nil
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68
	// get closest peers in the routing tables
69 70
	routeLevel := 0
	closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
71
	if closest == nil || len(closest) == 0 {
72
		log.Warning("Got no peers back from routing table!")
Jeromy's avatar
Jeromy committed
73
		return nil, kb.ErrLookupFailure
74 75
	}

76
	// setup the Query
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
	query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
78

79 80 81 82
		val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel)
		if err != nil {
			return nil, err
		}
83

84 85 86 87 88 89 90
		res := &dhtQueryResult{value: val, closerPeers: peers}
		if val != nil {
			res.success = true
		}

		return res, nil
	})
91

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92
	// run it!
93
	result, err := query.Run(ctx, closest)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94 95
	if err != nil {
		return nil, err
96 97
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
	log.Debugf("GetValue %v %v", key, result.value)
99
	if result.value == nil {
100
		return nil, routing.ErrNotFound
101
	}
102 103

	return result.value, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105 106 107 108
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

109
// Provide makes this node announce that it can provide a value for the given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111

112
	dht.providers.AddProvider(key, dht.self)
113
	peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
114
	if len(peers) == 0 {
115
		return nil
116 117
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119
	//TODO FIX: this doesn't work! it needs to be sent to the actual nearest peers.
	// `peers` are the closest peers we have, not the ones that should get the value.
120
	for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121 122 123 124
		err := dht.putProvider(ctx, p, string(key))
		if err != nil {
			return err
		}
125 126
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130 131
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
132
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer {
133
	log.Event(ctx, "findProviders", &key)
134
	peerOut := make(chan peer.Peer, count)
135
	go func() {
136 137
		defer close(peerOut)

138
		ps := newPeerSet()
139
		// TODO may want to make this function async to hide latency
140
		provs := dht.providers.GetProviders(ctx, key)
141 142
		for _, p := range provs {
			count--
143
			// NOTE: assuming that this list of peers is unique
144
			ps.Add(p)
145 146 147 148 149
			select {
			case peerOut <- p:
			case <-ctx.Done():
				return
			}
150 151 152 153 154
			if count <= 0 {
				return
			}
		}

155
		var wg sync.WaitGroup
Brian Tiger Chow's avatar
Brian Tiger Chow committed
156 157
		peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
		for _, pp := range peers {
Jeromy's avatar
Jeromy committed
158
			wg.Add(1)
159
			go func(p peer.Peer) {
Jeromy's avatar
Jeromy committed
160
				defer wg.Done()
161
				pmes, err := dht.findProvidersSingle(ctx, p, key, 0)
162
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
					log.Error(err)
164 165
					return
				}
166
				dht.addPeerListAsync(ctx, key, pmes.GetProviderPeers(), ps, count, peerOut)
Jeromy's avatar
Jeromy committed
167
			}(pp)
168
		}
Jeromy's avatar
Jeromy committed
169
		wg.Wait()
170 171 172 173
	}()
	return peerOut
}

174
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
175
	var wg sync.WaitGroup
176
	for _, pbp := range peers {
177
		wg.Add(1)
178
		go func(mp *pb.Message_Peer) {
179
			defer wg.Done()
Jeromy's avatar
Jeromy committed
180
			// construct new peer
181
			p, err := dht.ensureConnectedToPeer(ctx, mp)
Jeromy's avatar
Jeromy committed
182
			if err != nil {
Jeromy's avatar
Jeromy committed
183
				log.Errorf("%s", err)
Jeromy's avatar
Jeromy committed
184 185 186 187 188 189
				return
			}
			if p == nil {
				log.Error("Got nil peer from ensureConnectedToPeer")
				return
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190

Jeromy's avatar
Jeromy committed
191 192
			dht.providers.AddProvider(k, p)
			if ps.AddIfSmallerThan(p, count) {
193 194 195 196 197
				select {
				case out <- p:
				case <-ctx.Done():
					return
				}
Jeromy's avatar
Jeromy committed
198 199 200 201 202
			} else if ps.Size() >= count {
				return
			}
		}(pbp)
	}
203
	wg.Wait()
204 205
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206
// FindPeer searches for a peer with given ID.
207
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208

209
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
210
	p, _ := dht.FindLocal(id)
211 212 213 214
	if p != nil {
		return p, nil
	}

215
	routeLevel := 0
Jeromy's avatar
Jeromy committed
216 217 218
	closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
	if closest == nil || len(closest) == 0 {
		return nil, kb.ErrLookupFailure
219
	}
220

Jeromy's avatar
Jeromy committed
221 222 223 224 225
	// Sanity...
	for _, p := range closest {
		if p.ID().Equal(id) {
			log.Error("Found target peer in list of closest peers...")
			return p, nil
226
		}
227
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228

Jeromy's avatar
Jeromy committed
229
	// setup the Query
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
230
	query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
231

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232 233
		pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
		if err != nil {
234
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235
		}
236

Jeromy's avatar
Jeromy committed
237
		closer := pmes.GetCloserPeers()
238 239
		clpeers, errs := pb.PBPeersToPeers(dht.peerstore, closer)
		for _, err := range errs {
240
			if err != nil {
241
				log.Warning(err)
242
			}
243
		}
244

245 246 247
		// see it we got the peer here
		for _, np := range clpeers {
			if string(np.ID()) == string(id) {
Jeromy's avatar
Jeromy committed
248 249 250 251 252
				return &dhtQueryResult{
					peer:    np,
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
253 254
		}

Jeromy's avatar
Jeromy committed
255
		return &dhtQueryResult{closerPeers: clpeers}, nil
256
	})
257

Jeromy's avatar
Jeromy committed
258 259
	// run it!
	result, err := query.Run(ctx, closest)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
260 261 262 263
	if err != nil {
		return nil, err
	}

264
	log.Debugf("FindPeer %v %v", id, result.success)
265
	if result.peer == nil {
266
		return nil, routing.ErrNotFound
267
	}
Jeromy's avatar
Jeromy committed
268

269
	return result.peer, nil
270 271
}

272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan peer.Peer, error) {

	peerchan := make(chan peer.Peer, 10)
	peersSeen := map[string]peer.Peer{}

	routeLevel := 0
	closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
	if closest == nil || len(closest) == 0 {
		return nil, kb.ErrLookupFailure
	}

	// setup the Query
	query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {

		pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
		if err != nil {
			return nil, err
		}

		var clpeers []peer.Peer
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
			// skip peers already seen
			if _, found := peersSeen[string(pbp.GetId())]; found {
				continue
			}

			// skip peers that fail to unmarshal
			p, err := pb.PBPeerToPeer(dht.peerstore, pbp)
			if err != nil {
				log.Warning(err)
				continue
			}

			// if peer is connected, send it to our client.
			if pb.Connectedness(*pbp.Connection) == inet.Connected {
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
				case peerchan <- p:
				}
			}

			peersSeen[string(p.ID())] = p

			// if peer is the peer we're looking for, don't bother querying it.
			if pb.Connectedness(*pbp.Connection) != inet.Connected {
				clpeers = append(clpeers, p)
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
		if _, err := query.Run(ctx, closest); err != nil {
			log.Error(err)
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}

341
// Ping a peer, log the time it took
342
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error {
343
	// Thoughts: maybe this should accept an ID and do a peer lookup?
Brian Tiger Chow's avatar
Brian Tiger Chow committed
344
	log.Debugf("ping %s start", p)
345

346
	pmes := pb.NewMessage(pb.Message_PING, "", 0)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
347
	_, err := dht.sendRequest(ctx, p, pmes)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
348
	log.Debugf("ping %s end (err = %s)", p, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
349
	return err
350
}