routing.go 10.8 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
7
	"time"
8

9
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
10

11
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
12

13
	swarm "github.com/jbenet/go-ipfs/net/swarm"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	peer "github.com/jbenet/go-ipfs/peer"
15
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18 19 20 21 22 23
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
24
// This is the top level "Store" operation of the DHT
25
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
Jeromy's avatar
Jeromy committed
26
	complete := make(chan struct{})
Jeromy's avatar
Jeromy committed
27
	count := 0
28
	for _, route := range dht.routingTables {
Jeromy's avatar
Jeromy committed
29 30 31
		peers := route.NearestPeers(kb.ConvertKey(key), KValue)
		for _, p := range peers {
			if p == nil {
32
				dht.network.Error(kb.ErrLookupFailure)
Jeromy's avatar
Jeromy committed
33 34 35 36
				continue
			}
			count++
			go func(sp *peer.Peer) {
37
				err := dht.putValueToNetwork(sp, string(key), value)
Jeromy's avatar
Jeromy committed
38
				if err != nil {
39
					dht.network.Error(err)
Jeromy's avatar
Jeromy committed
40
				}
Jeromy's avatar
Jeromy committed
41
				complete <- struct{}{}
Jeromy's avatar
Jeromy committed
42
			}(p)
Jeromy's avatar
Jeromy committed
43 44
		}
	}
Jeromy's avatar
Jeromy committed
45
	for i := 0; i < count; i++ {
Jeromy's avatar
Jeromy committed
46
		<-complete
47
	}
48
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51
}

// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
52 53
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
54 55
func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
	ll := startNewRPC("GET")
Jeromy's avatar
Jeromy committed
56 57 58 59
	defer func() {
		ll.EndLog()
		ll.Print()
	}()
60

Jeromy's avatar
Jeromy committed
61 62
	// If we have it local, dont bother doing an RPC!
	// NOTE: this might not be what we want to do...
63
	val, err := dht.getLocal(key)
Jeromy's avatar
Jeromy committed
64 65
	if err == nil {
		ll.Success = true
Jeromy's avatar
Jeromy committed
66
		u.DOut("Found local, returning.\n")
Jeromy's avatar
Jeromy committed
67 68 69
		return val, nil
	}

70 71
	routeLevel := 0
	closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
72
	if closest == nil || len(closest) == 0 {
73 74 75
		return nil, kb.ErrLookupFailure
	}

76 77 78 79
	valChan := make(chan []byte)
	npeerChan := make(chan *peer.Peer, 30)
	procPeer := make(chan *peer.Peer, 30)
	errChan := make(chan error)
80
	after := time.After(timeout)
81
	pset := newPeerSet()
82

83
	for _, p := range closest {
84
		pset.Add(p)
85
		npeerChan <- p
86
	}
87

88 89 90 91
	c := counter{}

	count := 0
	go func() {
92
		defer close(procPeer)
93 94
		for {
			select {
95 96 97 98
			case p, ok := <-npeerChan:
				if !ok {
					return
				}
99
				count++
Jeromy's avatar
Jeromy committed
100
				if count >= KValue {
101 102
					errChan <- u.ErrNotFound
					return
Jeromy's avatar
Jeromy committed
103
				}
104
				c.Increment()
105

106
				procPeer <- p
107
			default:
108 109 110 111 112
				if c.Size() <= 0 {
					select {
					case errChan <- u.ErrNotFound:
					default:
					}
113
					return
114
				}
115 116 117
			}
		}
	}()
118

119
	process := func() {
120
		defer c.Decrement()
121 122 123 124 125 126 127 128 129 130
		for p := range procPeer {
			if p == nil {
				return
			}
			val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel)
			if err != nil {
				u.DErr("%v\n", err.Error())
				continue
			}
			if val != nil {
131 132 133 134 135
				select {
				case valChan <- val:
				default:
					u.DOut("Wasnt the first to return the value!")
				}
136 137
				return
			}
138

139 140 141 142 143
			for _, np := range peers {
				// TODO: filter out peers that arent closer
				if !pset.Contains(np) && pset.Size() < KValue {
					pset.Add(np) //This is racey... make a single function to do operation
					npeerChan <- np
144
				}
145
			}
146
			c.Decrement()
147
		}
148
	}
149

Jeromy's avatar
Jeromy committed
150
	for i := 0; i < AlphaValue; i++ {
151 152 153 154
		go process()
	}

	select {
155
	case val := <-valChan:
156
		return val, nil
157
	case err := <-errChan:
158 159 160 161
		return nil, err
	case <-after:
		return nil, u.ErrTimeout
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162 163 164 165 166
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

167 168
// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(key u.Key) error {
169
	dht.providers.AddProvider(key, dht.self)
170
	peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
171
	if len(peers) == 0 {
172
		return kb.ErrLookupFailure
173 174
	}

175
	pmes := Message{
176 177
		Type: PBDHTMessage_ADD_PROVIDER,
		Key:  string(key),
178 179 180
	}
	pbmes := pmes.ToProtobuf()

181
	for _, p := range peers {
182
		mes := swarm.NewMessage(p, pbmes)
183
		dht.netChan.Outgoing <- mes
184 185
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186 187
}

188 189 190 191 192 193 194
func (dht *IpfsDHT) FindProvidersAsync(key u.Key, count int, timeout time.Duration) chan *peer.Peer {
	peerOut := make(chan *peer.Peer, count)
	go func() {
		ps := newPeerSet()
		provs := dht.providers.GetProviders(key)
		for _, p := range provs {
			count--
195
			// NOTE: assuming that this list of peers is unique
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
			ps.Add(p)
			peerOut <- p
			if count <= 0 {
				return
			}
		}

		peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
		for _, pp := range peers {
			go func() {
				pmes, err := dht.findProvidersSingle(pp, key, 0, timeout)
				if err != nil {
					u.PErr("%v\n", err)
					return
				}
				dht.addPeerListAsync(key, pmes.GetPeers(), ps, count, peerOut)
			}()
		}

	}()
	return peerOut
}

//TODO: this function could also be done asynchronously
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*PBDHTMessage_PBPeer, ps *peerSet, count int, out chan *peer.Peer) {
	for _, pbp := range peers {
Jeromy's avatar
Jeromy committed
222 223 224
		if peer.ID(pbp.GetId()).Equal(dht.self.ID) {
			continue
		}
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
		maddr, err := ma.NewMultiaddr(pbp.GetAddr())
		if err != nil {
			u.PErr("%v\n", err)
			continue
		}
		p, err := dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
		if err != nil {
			u.PErr("%v\n", err)
			continue
		}
		dht.providers.AddProvider(k, p)
		if ps.AddIfSmallerThan(p, count) {
			out <- p
		} else if ps.Size() >= count {
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
244
// FindProviders searches for peers who can provide the value for given key.
245 246
func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
	ll := startNewRPC("FindProviders")
Jeromy's avatar
Jeromy committed
247 248 249 250
	defer func() {
		ll.EndLog()
		ll.Print()
	}()
251
	u.DOut("Find providers for: '%s'\n", key)
252
	p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
253 254 255
	if p == nil {
		return nil, kb.ErrLookupFailure
	}
256

257 258
	for level := 0; level < len(dht.routingTables); {
		pmes, err := dht.findProvidersSingle(p, key, level, timeout)
259 260 261
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
262
		if pmes.GetSuccess() {
Jeromy's avatar
Jeromy committed
263
			u.DOut("Got providers back from findProviders call!\n")
264
			provs := dht.addPeerList(key, pmes.GetPeers())
Jeromy's avatar
Jeromy committed
265 266
			ll.Success = true
			return provs, nil
267
		}
268

Jeromy's avatar
Jeromy committed
269 270
		u.DOut("Didnt get providers, just closer peers.\n")

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
		closer := pmes.GetPeers()
		if len(closer) == 0 {
			level++
			continue
		}
		if peer.ID(closer[0].GetId()).Equal(dht.self.ID) {
			u.DOut("Got myself back as a closer peer.")
			return nil, u.ErrNotFound
		}
		maddr, err := ma.NewMultiaddr(closer[0].GetAddr())
		if err != nil {
			// ??? Move up route level???
			panic("not yet implemented")
		}

		np, err := dht.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
		if err != nil {
288
			u.PErr("[%s] Failed to connect to: %s\n", dht.self.ID.Pretty(), closer[0].GetAddr())
289 290
			level++
			continue
Jeromy's avatar
Jeromy committed
291
		}
292
		p = np
293
	}
Jeromy's avatar
Jeromy committed
294
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
295 296 297 298 299
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
300
func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
301
	// Check if were already connected to them
302
	p, _ := dht.Find(id)
303 304 305 306
	if p != nil {
		return p, nil
	}

307 308
	routeLevel := 0
	p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id))
309 310
	if p == nil {
		return nil, kb.ErrLookupFailure
311
	}
312 313 314
	if p.ID.Equal(id) {
		return p, nil
	}
315

316 317
	for routeLevel < len(dht.routingTables) {
		pmes, err := dht.findPeerSingle(p, id, timeout, routeLevel)
318
		plist := pmes.GetPeers()
319
		if plist == nil || len(plist) == 0 {
320
			routeLevel++
321
			continue
322
		}
323 324 325
		found := plist[0]

		addr, err := ma.NewMultiaddr(found.GetAddr())
Jeromy's avatar
Jeromy committed
326
		if err != nil {
327
			return nil, err
Jeromy's avatar
Jeromy committed
328 329
		}

330
		nxtPeer, err := dht.network.GetConnection(peer.ID(found.GetId()), addr)
Jeromy's avatar
Jeromy committed
331
		if err != nil {
332
			return nil, err
Jeromy's avatar
Jeromy committed
333
		}
334
		if pmes.GetSuccess() {
335 336 337
			if !id.Equal(nxtPeer.ID) {
				return nil, errors.New("got back invalid peer from 'successful' response")
			}
338
			return nxtPeer, nil
Jeromy's avatar
Jeromy committed
339
		}
340
		p = nxtPeer
341
	}
342
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
343
}
344

345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
	// Check if were already connected to them
	p, _ := dht.Find(id)
	if p != nil {
		return p, nil
	}

	routeLevel := 0
	peers := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
	if len(peers) == 0 {
		return nil, kb.ErrLookupFailure
	}

	found := make(chan *peer.Peer)
	after := time.After(timeout)

	for _, p := range peers {
		go func(p *peer.Peer) {
			pmes, err := dht.findPeerSingle(p, id, timeout, routeLevel)
			if err != nil {
				u.DErr("getPeer error: %v\n", err)
				return
			}
			plist := pmes.GetPeers()
			if len(plist) == 0 {
				routeLevel++
			}
			for _, fp := range plist {
				nxtp, err := dht.peerFromInfo(fp)
				if err != nil {
					u.DErr("findPeer error: %v\n", err)
					continue
				}

				if nxtp.ID.Equal(dht.self.ID) {
					found <- nxtp
					return
				}
			}
		}(p)
	}

	select {
	case p := <-found:
		return p, nil
	case <-after:
		return nil, u.ErrTimeout
	}
}

395 396 397
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
Jeromy's avatar
Jeromy committed
398
	u.DOut("Enter Ping.\n")
399

400
	pmes := Message{ID: swarm.GenerateMessageID(), Type: PBDHTMessage_PING}
401 402 403
	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	before := time.Now()
404
	responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
405
	dht.netChan.Outgoing <- mes
406 407 408

	tout := time.After(timeout)
	select {
409
	case <-responseChan:
410
		roundtrip := time.Since(before)
411
		p.SetLatency(roundtrip)
412
		u.DOut("Ping took %s.\n", roundtrip.String())
413 414 415
		return nil
	case <-tout:
		// Timed out, think about removing peer from network
416
		u.DOut("[%s] Ping peer [%s] timed out.", dht.self.ID.Pretty(), p.ID.Pretty())
417
		dht.listener.Unlisten(pmes.ID)
418 419 420
		return u.ErrTimeout
	}
}
421

422
func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
423 424
	u.DOut("Begin Diagnostic")
	//Send to N closest peers
425
	targets := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
426 427

	// TODO: Add timeout to this struct so nodes know when to return
428
	pmes := Message{
429
		Type: PBDHTMessage_DIAGNOSTIC,
430
		ID:   swarm.GenerateMessageID(),
431 432
	}

433
	listenChan := dht.listener.Listen(pmes.ID, len(targets), time.Minute*2)
434 435

	pbmes := pmes.ToProtobuf()
436
	for _, p := range targets {
437
		mes := swarm.NewMessage(p, pbmes)
438
		dht.netChan.Outgoing <- mes
439 440 441 442 443 444 445 446 447
	}

	var out []*diagInfo
	after := time.After(timeout)
	for count := len(targets); count > 0; {
		select {
		case <-after:
			u.DOut("Diagnostic request timed out.")
			return out, u.ErrTimeout
Jeromy's avatar
Jeromy committed
448
		case resp := <-listenChan:
449 450
			pmesOut := new(PBDHTMessage)
			err := proto.Unmarshal(resp.Data, pmesOut)
451 452 453 454 455 456
			if err != nil {
				// NOTE: here and elsewhere, need to audit error handling,
				//		some errors should be continued on from
				return out, err
			}

457
			dec := json.NewDecoder(bytes.NewBuffer(pmesOut.GetValue()))
458 459 460 461 462 463 464 465 466 467 468 469
			for {
				di := new(diagInfo)
				err := dec.Decode(di)
				if err != nil {
					break
				}

				out = append(out, di)
			}
		}
	}

470
	return nil, nil
471
}