routing.go 9.15 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
7
	"math/rand"
8
	"sync"
9
	"time"
10

11 12
	proto "code.google.com/p/goprotobuf/proto"

13 14
	ma "github.com/jbenet/go-multiaddr"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	peer "github.com/jbenet/go-ipfs/peer"
16
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

21 22 23
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6

Jeromy's avatar
Jeromy committed
24 25 26 27 28 29
// We put the 'K' in kademlia!
var KValue = 10

// Its in the paper, i swear
var AlphaValue = 3

30
// GenerateMessageID creates and returns a new message ID
31 32
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
33 34
	//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
	return uint64(rand.Uint32())
35 36
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37 38 39 40 41
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
42
// This is the top level "Store" operation of the DHT
43
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) {
Jeromy's avatar
Jeromy committed
44
	complete := make(chan struct{})
Jeromy's avatar
Jeromy committed
45
	count := 0
46
	for _, route := range dht.routingTables {
Jeromy's avatar
Jeromy committed
47 48 49
		peers := route.NearestPeers(kb.ConvertKey(key), KValue)
		for _, p := range peers {
			if p == nil {
50
				dht.network.Error(kb.ErrLookupFailure)
Jeromy's avatar
Jeromy committed
51 52 53 54
				continue
			}
			count++
			go func(sp *peer.Peer) {
55
				err := dht.putValueToNetwork(sp, string(key), value)
Jeromy's avatar
Jeromy committed
56
				if err != nil {
57
					dht.network.Error(err)
Jeromy's avatar
Jeromy committed
58
				}
Jeromy's avatar
Jeromy committed
59
				complete <- struct{}{}
Jeromy's avatar
Jeromy committed
60
			}(p)
Jeromy's avatar
Jeromy committed
61 62
		}
	}
Jeromy's avatar
Jeromy committed
63
	for i := 0; i < count; i++ {
Jeromy's avatar
Jeromy committed
64
		<-complete
65
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66 67
}

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
// A counter for incrementing a variable across multiple threads
type counter struct {
	n   int
	mut sync.RWMutex
}

func (c *counter) Increment() {
	c.mut.Lock()
	c.n++
	c.mut.Unlock()
}

func (c *counter) Decrement() {
	c.mut.Lock()
	c.n--
	c.mut.Unlock()
}

func (c *counter) Size() int {
	c.mut.RLock()
	defer c.mut.RUnlock()
	return c.n
}

92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
type peerSet struct {
	ps map[string]bool
	lk sync.RWMutex
}

func newPeerSet() *peerSet {
	ps := new(peerSet)
	ps.ps = make(map[string]bool)
	return ps
}

func (ps *peerSet) Add(p *peer.Peer) {
	ps.lk.Lock()
	ps.ps[string(p.ID)] = true
	ps.lk.Unlock()
}

func (ps *peerSet) Contains(p *peer.Peer) bool {
	ps.lk.RLock()
	_, ok := ps.ps[string(p.ID)]
	ps.lk.RUnlock()
	return ok
}

func (ps *peerSet) Size() int {
	ps.lk.RLock()
	defer ps.lk.RUnlock()
	return len(ps.ps)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122
// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
123 124
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
125 126
func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
	ll := startNewRPC("GET")
Jeromy's avatar
Jeromy committed
127 128 129 130
	defer func() {
		ll.EndLog()
		ll.Print()
	}()
131

Jeromy's avatar
Jeromy committed
132 133
	// If we have it local, dont bother doing an RPC!
	// NOTE: this might not be what we want to do...
134
	val, err := dht.getLocal(key)
Jeromy's avatar
Jeromy committed
135 136 137
	if err == nil {
		ll.Success = true
		u.DOut("Found local, returning.")
Jeromy's avatar
Jeromy committed
138 139 140
		return val, nil
	}

141 142
	routeLevel := 0
	closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
143
	if closest == nil || len(closest) == 0 {
144 145 146
		return nil, kb.ErrLookupFailure
	}

147 148 149 150
	valChan := make(chan []byte)
	npeerChan := make(chan *peer.Peer, 30)
	procPeer := make(chan *peer.Peer, 30)
	errChan := make(chan error)
151
	after := time.After(timeout)
152
	pset := newPeerSet()
153

154
	for _, p := range closest {
155
		pset.Add(p)
156
		npeerChan <- p
157
	}
158

159 160 161 162 163 164
	c := counter{}

	count := 0
	go func() {
		for {
			select {
165
			case p := <-npeerChan:
166
				count++
Jeromy's avatar
Jeromy committed
167
				if count >= KValue {
168
					break
Jeromy's avatar
Jeromy committed
169
				}
170
				c.Increment()
171

172
				procPeer <- p
173 174
			default:
				if c.Size() == 0 {
175
					errChan <- u.ErrNotFound
176
				}
177 178 179
			}
		}
	}()
180

181 182 183
	process := func() {
		for {
			select {
184
			case p, ok := <-procPeer:
185 186 187 188
				if !ok || p == nil {
					c.Decrement()
					return
				}
189
				val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel)
190
				if err != nil {
191 192
					u.DErr(err.Error())
					c.Decrement()
Jeromy's avatar
Jeromy committed
193
					continue
194
				}
195
				if val != nil {
196
					valChan <- val
197 198 199 200 201 202
					c.Decrement()
					return
				}

				for _, np := range peers {
					// TODO: filter out peers that arent closer
Jeromy's avatar
Jeromy committed
203
					if !pset.Contains(np) && pset.Size() < KValue {
204
						pset.Add(np) //This is racey... make a single function to do operation
205
						npeerChan <- np
206
					}
207 208
				}
				c.Decrement()
209
			}
210
		}
211
	}
212

Jeromy's avatar
Jeromy committed
213
	for i := 0; i < AlphaValue; i++ {
214 215 216 217
		go process()
	}

	select {
218
	case val := <-valChan:
219
		return val, nil
220
	case err := <-errChan:
221 222 223 224
		return nil, err
	case <-after:
		return nil, u.ErrTimeout
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225 226 227 228 229
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

230 231 232
// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(key u.Key) error {
	peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
233
	if len(peers) == 0 {
234
		return kb.ErrLookupFailure
235 236
	}

237
	pmes := Message{
238 239
		Type: PBDHTMessage_ADD_PROVIDER,
		Key:  string(key),
240 241 242
	}
	pbmes := pmes.ToProtobuf()

243
	for _, p := range peers {
244
		mes := swarm.NewMessage(p, pbmes)
245
		dht.network.Send(mes)
246 247
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
248 249 250
}

// FindProviders searches for peers who can provide the value for given key.
251 252
func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
	ll := startNewRPC("FindProviders")
Jeromy's avatar
Jeromy committed
253 254 255 256 257
	defer func() {
		ll.EndLog()
		ll.Print()
	}()
	u.DOut("Find providers for: '%s'", key)
258
	p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
259 260 261
	if p == nil {
		return nil, kb.ErrLookupFailure
	}
262

263 264
	for level := 0; level < len(dht.routingTables); {
		pmes, err := dht.findProvidersSingle(p, key, level, timeout)
265 266 267
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
268
		if pmes.GetSuccess() {
269
			provs := dht.addPeerList(key, pmes.GetPeers())
Jeromy's avatar
Jeromy committed
270 271
			ll.Success = true
			return provs, nil
272
		}
273

274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
		closer := pmes.GetPeers()
		if len(closer) == 0 {
			level++
			continue
		}
		if peer.ID(closer[0].GetId()).Equal(dht.self.ID) {
			u.DOut("Got myself back as a closer peer.")
			return nil, u.ErrNotFound
		}
		maddr, err := ma.NewMultiaddr(closer[0].GetAddr())
		if err != nil {
			// ??? Move up route level???
			panic("not yet implemented")
		}

		np, err := dht.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
		if err != nil {
			u.PErr("[%s] Failed to connect to: %s", dht.self.ID.Pretty(), closer[0].GetAddr())
			level++
			continue
Jeromy's avatar
Jeromy committed
294
		}
295
		p = np
296
	}
Jeromy's avatar
Jeromy committed
297
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
298 299 300 301 302
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
303
func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
304
	// Check if were already connected to them
305
	p, _ := dht.Find(id)
306 307 308 309
	if p != nil {
		return p, nil
	}

310 311
	routeLevel := 0
	p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id))
312 313
	if p == nil {
		return nil, kb.ErrLookupFailure
314
	}
315 316 317
	if p.ID.Equal(id) {
		return p, nil
	}
318

319 320
	for routeLevel < len(dht.routingTables) {
		pmes, err := dht.findPeerSingle(p, id, timeout, routeLevel)
321 322
		plist := pmes.GetPeers()
		if len(plist) == 0 {
323
			routeLevel++
324
		}
325 326 327
		found := plist[0]

		addr, err := ma.NewMultiaddr(found.GetAddr())
Jeromy's avatar
Jeromy committed
328
		if err != nil {
329
			return nil, err
Jeromy's avatar
Jeromy committed
330 331
		}

332
		nxtPeer, err := dht.network.GetConnection(peer.ID(found.GetId()), addr)
Jeromy's avatar
Jeromy committed
333
		if err != nil {
334
			return nil, err
Jeromy's avatar
Jeromy committed
335
		}
336
		if pmes.GetSuccess() {
337 338 339
			if !id.Equal(nxtPeer.ID) {
				return nil, errors.New("got back invalid peer from 'successful' response")
			}
340
			return nxtPeer, nil
Jeromy's avatar
Jeromy committed
341
		}
342
		p = nxtPeer
343
	}
344
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
345
}
346 347 348 349 350 351

// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
	u.DOut("Enter Ping.")

352
	pmes := Message{ID: GenerateMessageID(), Type: PBDHTMessage_PING}
353 354 355
	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	before := time.Now()
356
	responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
357
	dht.network.Send(mes)
358 359 360

	tout := time.After(timeout)
	select {
361
	case <-responseChan:
362
		roundtrip := time.Since(before)
363
		p.SetLatency(roundtrip)
364
		u.DOut("Ping took %s.", roundtrip.String())
365 366 367 368
		return nil
	case <-tout:
		// Timed out, think about removing peer from network
		u.DOut("Ping peer timed out.")
369
		dht.listener.Unlisten(pmes.ID)
370 371 372
		return u.ErrTimeout
	}
}
373

374
func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
375 376
	u.DOut("Begin Diagnostic")
	//Send to N closest peers
377
	targets := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
378 379

	// TODO: Add timeout to this struct so nodes know when to return
380
	pmes := Message{
381
		Type: PBDHTMessage_DIAGNOSTIC,
382
		ID:   GenerateMessageID(),
383 384
	}

385
	listenChan := dht.listener.Listen(pmes.ID, len(targets), time.Minute*2)
386 387

	pbmes := pmes.ToProtobuf()
388
	for _, p := range targets {
389
		mes := swarm.NewMessage(p, pbmes)
390
		dht.network.Send(mes)
391 392 393 394 395 396 397 398 399
	}

	var out []*diagInfo
	after := time.After(timeout)
	for count := len(targets); count > 0; {
		select {
		case <-after:
			u.DOut("Diagnostic request timed out.")
			return out, u.ErrTimeout
Jeromy's avatar
Jeromy committed
400
		case resp := <-listenChan:
401 402
			pmesOut := new(PBDHTMessage)
			err := proto.Unmarshal(resp.Data, pmesOut)
403 404 405 406 407 408
			if err != nil {
				// NOTE: here and elsewhere, need to audit error handling,
				//		some errors should be continued on from
				return out, err
			}

409
			dec := json.NewDecoder(bytes.NewBuffer(pmesOut.GetValue()))
410 411 412 413 414 415 416 417 418 419 420 421
			for {
				di := new(diagInfo)
				err := dec.Decode(di)
				if err != nil {
					break
				}

				out = append(out, di)
			}
		}
	}

422
	return nil, nil
423
}