routing.go 9.6 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
7
	"time"
8

9 10
	proto "code.google.com/p/goprotobuf/proto"

11 12
	ma "github.com/jbenet/go-multiaddr"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	peer "github.com/jbenet/go-ipfs/peer"
14
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18 19 20 21 22 23
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
24
// This is the top level "Store" operation of the DHT
25
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
Jeromy's avatar
Jeromy committed
26
	complete := make(chan struct{})
Jeromy's avatar
Jeromy committed
27
	count := 0
28
	for _, route := range dht.routingTables {
Jeromy's avatar
Jeromy committed
29 30 31
		peers := route.NearestPeers(kb.ConvertKey(key), KValue)
		for _, p := range peers {
			if p == nil {
32
				dht.network.Error(kb.ErrLookupFailure)
Jeromy's avatar
Jeromy committed
33 34 35 36
				continue
			}
			count++
			go func(sp *peer.Peer) {
37
				err := dht.putValueToNetwork(sp, string(key), value)
Jeromy's avatar
Jeromy committed
38
				if err != nil {
39
					dht.network.Error(err)
Jeromy's avatar
Jeromy committed
40
				}
Jeromy's avatar
Jeromy committed
41
				complete <- struct{}{}
Jeromy's avatar
Jeromy committed
42
			}(p)
Jeromy's avatar
Jeromy committed
43 44
		}
	}
Jeromy's avatar
Jeromy committed
45
	for i := 0; i < count; i++ {
Jeromy's avatar
Jeromy committed
46
		<-complete
47
	}
48
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51
}

// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
52 53
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
54 55
func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
	ll := startNewRPC("GET")
Jeromy's avatar
Jeromy committed
56 57 58 59
	defer func() {
		ll.EndLog()
		ll.Print()
	}()
60

Jeromy's avatar
Jeromy committed
61 62
	// If we have it local, dont bother doing an RPC!
	// NOTE: this might not be what we want to do...
63
	val, err := dht.getLocal(key)
Jeromy's avatar
Jeromy committed
64 65
	if err == nil {
		ll.Success = true
Jeromy's avatar
Jeromy committed
66
		u.DOut("Found local, returning.\n")
Jeromy's avatar
Jeromy committed
67 68 69
		return val, nil
	}

70 71
	routeLevel := 0
	closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
72
	if closest == nil || len(closest) == 0 {
73 74 75
		return nil, kb.ErrLookupFailure
	}

76 77 78 79
	valChan := make(chan []byte)
	npeerChan := make(chan *peer.Peer, 30)
	procPeer := make(chan *peer.Peer, 30)
	errChan := make(chan error)
80
	after := time.After(timeout)
81
	pset := newPeerSet()
82

83
	for _, p := range closest {
84
		pset.Add(p)
85
		npeerChan <- p
86
	}
87

88 89 90 91
	c := counter{}

	count := 0
	go func() {
92
		defer close(procPeer)
93 94
		for {
			select {
95 96 97 98
			case p, ok := <-npeerChan:
				if !ok {
					return
				}
99
				count++
Jeromy's avatar
Jeromy committed
100
				if count >= KValue {
101 102
					errChan <- u.ErrNotFound
					return
Jeromy's avatar
Jeromy committed
103
				}
104
				c.Increment()
105

106
				procPeer <- p
107
			default:
108 109 110 111 112
				if c.Size() <= 0 {
					select {
					case errChan <- u.ErrNotFound:
					default:
					}
113
					return
114
				}
115 116 117
			}
		}
	}()
118

119
	process := func() {
120
		defer c.Decrement()
121 122 123 124 125 126 127 128 129 130
		for p := range procPeer {
			if p == nil {
				return
			}
			val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel)
			if err != nil {
				u.DErr("%v\n", err.Error())
				continue
			}
			if val != nil {
131 132 133 134 135
				select {
				case valChan <- val:
				default:
					u.DOut("Wasnt the first to return the value!")
				}
136 137
				return
			}
138

139 140 141 142 143
			for _, np := range peers {
				// TODO: filter out peers that arent closer
				if !pset.Contains(np) && pset.Size() < KValue {
					pset.Add(np) //This is racey... make a single function to do operation
					npeerChan <- np
144
				}
145
			}
146
			c.Decrement()
147
		}
148
	}
149

Jeromy's avatar
Jeromy committed
150
	for i := 0; i < AlphaValue; i++ {
151 152 153 154
		go process()
	}

	select {
155
	case val := <-valChan:
156
		return val, nil
157
	case err := <-errChan:
158 159 160 161
		return nil, err
	case <-after:
		return nil, u.ErrTimeout
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162 163 164 165 166
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

167 168 169
// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(key u.Key) error {
	peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
170
	if len(peers) == 0 {
171
		return kb.ErrLookupFailure
172 173
	}

174
	pmes := Message{
175 176
		Type: PBDHTMessage_ADD_PROVIDER,
		Key:  string(key),
177 178 179
	}
	pbmes := pmes.ToProtobuf()

180
	for _, p := range peers {
181
		mes := swarm.NewMessage(p, pbmes)
182
		dht.netChan.Outgoing <- mes
183 184
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185 186
}

187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
func (dht *IpfsDHT) FindProvidersAsync(key u.Key, count int, timeout time.Duration) chan *peer.Peer {
	peerOut := make(chan *peer.Peer, count)
	go func() {
		ps := newPeerSet()
		provs := dht.providers.GetProviders(key)
		for _, p := range provs {
			count--
			// NOTE: assuming that the list of peers is unique
			ps.Add(p)
			peerOut <- p
			if count <= 0 {
				return
			}
		}

		peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
		for _, pp := range peers {
			go func() {
				pmes, err := dht.findProvidersSingle(pp, key, 0, timeout)
				if err != nil {
					u.PErr("%v\n", err)
					return
				}
				dht.addPeerListAsync(key, pmes.GetPeers(), ps, count, peerOut)
			}()
		}

	}()
	return peerOut
}

//TODO: this function could also be done asynchronously
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*PBDHTMessage_PBPeer, ps *peerSet, count int, out chan *peer.Peer) {
	for _, pbp := range peers {
Jeromy's avatar
Jeromy committed
221 222 223
		if peer.ID(pbp.GetId()).Equal(dht.self.ID) {
			continue
		}
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
		maddr, err := ma.NewMultiaddr(pbp.GetAddr())
		if err != nil {
			u.PErr("%v\n", err)
			continue
		}
		p, err := dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
		if err != nil {
			u.PErr("%v\n", err)
			continue
		}
		dht.providers.AddProvider(k, p)
		if ps.AddIfSmallerThan(p, count) {
			out <- p
		} else if ps.Size() >= count {
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
243
// FindProviders searches for peers who can provide the value for given key.
244 245
func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
	ll := startNewRPC("FindProviders")
Jeromy's avatar
Jeromy committed
246 247 248 249
	defer func() {
		ll.EndLog()
		ll.Print()
	}()
250
	u.DOut("Find providers for: '%s'\n", key)
251
	p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
252 253 254
	if p == nil {
		return nil, kb.ErrLookupFailure
	}
255

256 257
	for level := 0; level < len(dht.routingTables); {
		pmes, err := dht.findProvidersSingle(p, key, level, timeout)
258 259 260
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
261
		if pmes.GetSuccess() {
Jeromy's avatar
Jeromy committed
262
			u.DOut("Got providers back from findProviders call!\n")
263
			provs := dht.addPeerList(key, pmes.GetPeers())
Jeromy's avatar
Jeromy committed
264 265
			ll.Success = true
			return provs, nil
266
		}
267

Jeromy's avatar
Jeromy committed
268 269
		u.DOut("Didnt get providers, just closer peers.\n")

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
		closer := pmes.GetPeers()
		if len(closer) == 0 {
			level++
			continue
		}
		if peer.ID(closer[0].GetId()).Equal(dht.self.ID) {
			u.DOut("Got myself back as a closer peer.")
			return nil, u.ErrNotFound
		}
		maddr, err := ma.NewMultiaddr(closer[0].GetAddr())
		if err != nil {
			// ??? Move up route level???
			panic("not yet implemented")
		}

		np, err := dht.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
		if err != nil {
287
			u.PErr("[%s] Failed to connect to: %s\n", dht.self.ID.Pretty(), closer[0].GetAddr())
288 289
			level++
			continue
Jeromy's avatar
Jeromy committed
290
		}
291
		p = np
292
	}
Jeromy's avatar
Jeromy committed
293
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
294 295 296 297 298
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
299
func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
300
	// Check if were already connected to them
301
	p, _ := dht.Find(id)
302 303 304 305
	if p != nil {
		return p, nil
	}

306 307
	routeLevel := 0
	p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id))
308 309
	if p == nil {
		return nil, kb.ErrLookupFailure
310
	}
311 312 313
	if p.ID.Equal(id) {
		return p, nil
	}
314

315 316
	for routeLevel < len(dht.routingTables) {
		pmes, err := dht.findPeerSingle(p, id, timeout, routeLevel)
317 318
		plist := pmes.GetPeers()
		if len(plist) == 0 {
319
			routeLevel++
320
		}
321 322 323
		found := plist[0]

		addr, err := ma.NewMultiaddr(found.GetAddr())
Jeromy's avatar
Jeromy committed
324
		if err != nil {
325
			return nil, err
Jeromy's avatar
Jeromy committed
326 327
		}

328
		nxtPeer, err := dht.network.GetConnection(peer.ID(found.GetId()), addr)
Jeromy's avatar
Jeromy committed
329
		if err != nil {
330
			return nil, err
Jeromy's avatar
Jeromy committed
331
		}
332
		if pmes.GetSuccess() {
333 334 335
			if !id.Equal(nxtPeer.ID) {
				return nil, errors.New("got back invalid peer from 'successful' response")
			}
336
			return nxtPeer, nil
Jeromy's avatar
Jeromy committed
337
		}
338
		p = nxtPeer
339
	}
340
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
341
}
342 343 344 345

// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
Jeromy's avatar
Jeromy committed
346
	u.DOut("Enter Ping.\n")
347

348
	pmes := Message{ID: swarm.GenerateMessageID(), Type: PBDHTMessage_PING}
349 350 351
	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	before := time.Now()
352
	responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
353
	dht.netChan.Outgoing <- mes
354 355 356

	tout := time.After(timeout)
	select {
357
	case <-responseChan:
358
		roundtrip := time.Since(before)
359
		p.SetLatency(roundtrip)
360
		u.DOut("Ping took %s.\n", roundtrip.String())
361 362 363
		return nil
	case <-tout:
		// Timed out, think about removing peer from network
364
		u.DOut("[%s] Ping peer [%s] timed out.", dht.self.ID.Pretty(), p.ID.Pretty())
365
		dht.listener.Unlisten(pmes.ID)
366 367 368
		return u.ErrTimeout
	}
}
369

370
func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
371 372
	u.DOut("Begin Diagnostic")
	//Send to N closest peers
373
	targets := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
374 375

	// TODO: Add timeout to this struct so nodes know when to return
376
	pmes := Message{
377
		Type: PBDHTMessage_DIAGNOSTIC,
378
		ID:   swarm.GenerateMessageID(),
379 380
	}

381
	listenChan := dht.listener.Listen(pmes.ID, len(targets), time.Minute*2)
382 383

	pbmes := pmes.ToProtobuf()
384
	for _, p := range targets {
385
		mes := swarm.NewMessage(p, pbmes)
386
		dht.netChan.Outgoing <- mes
387 388 389 390 391 392 393 394 395
	}

	var out []*diagInfo
	after := time.After(timeout)
	for count := len(targets); count > 0; {
		select {
		case <-after:
			u.DOut("Diagnostic request timed out.")
			return out, u.ErrTimeout
Jeromy's avatar
Jeromy committed
396
		case resp := <-listenChan:
397 398
			pmesOut := new(PBDHTMessage)
			err := proto.Unmarshal(resp.Data, pmesOut)
399 400 401 402 403 404
			if err != nil {
				// NOTE: here and elsewhere, need to audit error handling,
				//		some errors should be continued on from
				return out, err
			}

405
			dec := json.NewDecoder(bytes.NewBuffer(pmesOut.GetValue()))
406 407 408 409 410 411 412 413 414 415 416 417
			for {
				di := new(diagInfo)
				err := dec.Decode(di)
				if err != nil {
					break
				}

				out = append(out, di)
			}
		}
	}

418
	return nil, nil
419
}