routing.go 10.6 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
7
	"time"
8

9 10
	proto "code.google.com/p/goprotobuf/proto"

11 12
	ma "github.com/jbenet/go-multiaddr"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	peer "github.com/jbenet/go-ipfs/peer"
14
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18 19 20 21 22 23
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
24
// This is the top level "Store" operation of the DHT
25
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
Jeromy's avatar
Jeromy committed
26
	complete := make(chan struct{})
Jeromy's avatar
Jeromy committed
27
	count := 0
28
	for _, route := range dht.routingTables {
Jeromy's avatar
Jeromy committed
29 30 31
		peers := route.NearestPeers(kb.ConvertKey(key), KValue)
		for _, p := range peers {
			if p == nil {
32
				dht.network.Error(kb.ErrLookupFailure)
Jeromy's avatar
Jeromy committed
33 34 35 36
				continue
			}
			count++
			go func(sp *peer.Peer) {
37
				err := dht.putValueToNetwork(sp, string(key), value)
Jeromy's avatar
Jeromy committed
38
				if err != nil {
39
					dht.network.Error(err)
Jeromy's avatar
Jeromy committed
40
				}
Jeromy's avatar
Jeromy committed
41
				complete <- struct{}{}
Jeromy's avatar
Jeromy committed
42
			}(p)
Jeromy's avatar
Jeromy committed
43 44
		}
	}
Jeromy's avatar
Jeromy committed
45
	for i := 0; i < count; i++ {
Jeromy's avatar
Jeromy committed
46
		<-complete
47
	}
48
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51
}

// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
52 53
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
54 55
func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
	ll := startNewRPC("GET")
Jeromy's avatar
Jeromy committed
56 57 58 59
	defer func() {
		ll.EndLog()
		ll.Print()
	}()
60

Jeromy's avatar
Jeromy committed
61 62
	// If we have it local, dont bother doing an RPC!
	// NOTE: this might not be what we want to do...
63
	val, err := dht.getLocal(key)
Jeromy's avatar
Jeromy committed
64 65
	if err == nil {
		ll.Success = true
Jeromy's avatar
Jeromy committed
66
		u.DOut("Found local, returning.\n")
Jeromy's avatar
Jeromy committed
67 68 69
		return val, nil
	}

70 71
	routeLevel := 0
	closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
72
	if closest == nil || len(closest) == 0 {
73 74 75
		return nil, kb.ErrLookupFailure
	}

76 77 78 79
	valChan := make(chan []byte)
	npeerChan := make(chan *peer.Peer, 30)
	procPeer := make(chan *peer.Peer, 30)
	errChan := make(chan error)
80
	after := time.After(timeout)
81
	pset := newPeerSet()
82

83
	for _, p := range closest {
84
		pset.Add(p)
85
		npeerChan <- p
86
	}
87

88 89 90 91
	c := counter{}

	count := 0
	go func() {
92
		defer close(procPeer)
93 94
		for {
			select {
95 96 97 98
			case p, ok := <-npeerChan:
				if !ok {
					return
				}
99
				count++
Jeromy's avatar
Jeromy committed
100
				if count >= KValue {
101 102
					errChan <- u.ErrNotFound
					return
Jeromy's avatar
Jeromy committed
103
				}
104
				c.Increment()
105

106
				procPeer <- p
107
			default:
108 109 110 111 112
				if c.Size() <= 0 {
					select {
					case errChan <- u.ErrNotFound:
					default:
					}
113
					return
114
				}
115 116 117
			}
		}
	}()
118

119
	process := func() {
120
		defer c.Decrement()
121 122 123 124 125 126 127 128 129 130
		for p := range procPeer {
			if p == nil {
				return
			}
			val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel)
			if err != nil {
				u.DErr("%v\n", err.Error())
				continue
			}
			if val != nil {
131 132 133 134 135
				select {
				case valChan <- val:
				default:
					u.DOut("Wasnt the first to return the value!")
				}
136 137
				return
			}
138

139 140 141 142 143
			for _, np := range peers {
				// TODO: filter out peers that arent closer
				if !pset.Contains(np) && pset.Size() < KValue {
					pset.Add(np) //This is racey... make a single function to do operation
					npeerChan <- np
144
				}
145
			}
146
			c.Decrement()
147
		}
148
	}
149

Jeromy's avatar
Jeromy committed
150
	for i := 0; i < AlphaValue; i++ {
151 152 153 154
		go process()
	}

	select {
155
	case val := <-valChan:
156
		return val, nil
157
	case err := <-errChan:
158 159 160 161
		return nil, err
	case <-after:
		return nil, u.ErrTimeout
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162 163 164 165 166
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

167 168 169
// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(key u.Key) error {
	peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
170
	if len(peers) == 0 {
171
		return kb.ErrLookupFailure
172 173
	}

174
	pmes := Message{
175 176
		Type: PBDHTMessage_ADD_PROVIDER,
		Key:  string(key),
177 178 179
	}
	pbmes := pmes.ToProtobuf()

180
	for _, p := range peers {
181
		mes := swarm.NewMessage(p, pbmes)
182
		dht.netChan.Outgoing <- mes
183 184
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185 186
}

187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
func (dht *IpfsDHT) FindProvidersAsync(key u.Key, count int, timeout time.Duration) chan *peer.Peer {
	peerOut := make(chan *peer.Peer, count)
	go func() {
		ps := newPeerSet()
		provs := dht.providers.GetProviders(key)
		for _, p := range provs {
			count--
			// NOTE: assuming that the list of peers is unique
			ps.Add(p)
			peerOut <- p
			if count <= 0 {
				return
			}
		}

		peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
		for _, pp := range peers {
			go func() {
				pmes, err := dht.findProvidersSingle(pp, key, 0, timeout)
				if err != nil {
					u.PErr("%v\n", err)
					return
				}
				dht.addPeerListAsync(key, pmes.GetPeers(), ps, count, peerOut)
			}()
		}

	}()
	return peerOut
}

//TODO: this function could also be done asynchronously
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*PBDHTMessage_PBPeer, ps *peerSet, count int, out chan *peer.Peer) {
	for _, pbp := range peers {
Jeromy's avatar
Jeromy committed
221 222 223
		if peer.ID(pbp.GetId()).Equal(dht.self.ID) {
			continue
		}
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
		maddr, err := ma.NewMultiaddr(pbp.GetAddr())
		if err != nil {
			u.PErr("%v\n", err)
			continue
		}
		p, err := dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
		if err != nil {
			u.PErr("%v\n", err)
			continue
		}
		dht.providers.AddProvider(k, p)
		if ps.AddIfSmallerThan(p, count) {
			out <- p
		} else if ps.Size() >= count {
			return
		}
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
243
// FindProviders searches for peers who can provide the value for given key.
244 245
func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
	ll := startNewRPC("FindProviders")
Jeromy's avatar
Jeromy committed
246 247 248 249
	defer func() {
		ll.EndLog()
		ll.Print()
	}()
250
	u.DOut("Find providers for: '%s'\n", key)
251
	p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
252 253 254
	if p == nil {
		return nil, kb.ErrLookupFailure
	}
255

256 257
	for level := 0; level < len(dht.routingTables); {
		pmes, err := dht.findProvidersSingle(p, key, level, timeout)
258 259 260
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
261
		if pmes.GetSuccess() {
Jeromy's avatar
Jeromy committed
262
			u.DOut("Got providers back from findProviders call!\n")
263
			provs := dht.addPeerList(key, pmes.GetPeers())
Jeromy's avatar
Jeromy committed
264 265
			ll.Success = true
			return provs, nil
266
		}
267

Jeromy's avatar
Jeromy committed
268 269
		u.DOut("Didnt get providers, just closer peers.\n")

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
		closer := pmes.GetPeers()
		if len(closer) == 0 {
			level++
			continue
		}
		if peer.ID(closer[0].GetId()).Equal(dht.self.ID) {
			u.DOut("Got myself back as a closer peer.")
			return nil, u.ErrNotFound
		}
		maddr, err := ma.NewMultiaddr(closer[0].GetAddr())
		if err != nil {
			// ??? Move up route level???
			panic("not yet implemented")
		}

		np, err := dht.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
		if err != nil {
287
			u.PErr("[%s] Failed to connect to: %s\n", dht.self.ID.Pretty(), closer[0].GetAddr())
288 289
			level++
			continue
Jeromy's avatar
Jeromy committed
290
		}
291
		p = np
292
	}
Jeromy's avatar
Jeromy committed
293
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
294 295 296 297 298
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
299
func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
300
	// Check if were already connected to them
301
	p, _ := dht.Find(id)
302 303 304 305
	if p != nil {
		return p, nil
	}

306 307
	routeLevel := 0
	p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id))
308 309
	if p == nil {
		return nil, kb.ErrLookupFailure
310
	}
311 312 313
	if p.ID.Equal(id) {
		return p, nil
	}
314

315 316
	for routeLevel < len(dht.routingTables) {
		pmes, err := dht.findPeerSingle(p, id, timeout, routeLevel)
317
		plist := pmes.GetPeers()
318
		if plist == nil || len(plist) == 0 {
319
			routeLevel++
320
			continue
321
		}
322 323 324
		found := plist[0]

		addr, err := ma.NewMultiaddr(found.GetAddr())
Jeromy's avatar
Jeromy committed
325
		if err != nil {
326
			return nil, err
Jeromy's avatar
Jeromy committed
327 328
		}

329
		nxtPeer, err := dht.network.GetConnection(peer.ID(found.GetId()), addr)
Jeromy's avatar
Jeromy committed
330
		if err != nil {
331
			return nil, err
Jeromy's avatar
Jeromy committed
332
		}
333
		if pmes.GetSuccess() {
334 335 336
			if !id.Equal(nxtPeer.ID) {
				return nil, errors.New("got back invalid peer from 'successful' response")
			}
337
			return nxtPeer, nil
Jeromy's avatar
Jeromy committed
338
		}
339
		p = nxtPeer
340
	}
341
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
342
}
343

344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
	// Check if were already connected to them
	p, _ := dht.Find(id)
	if p != nil {
		return p, nil
	}

	routeLevel := 0
	peers := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
	if len(peers) == 0 {
		return nil, kb.ErrLookupFailure
	}

	found := make(chan *peer.Peer)
	after := time.After(timeout)

	for _, p := range peers {
		go func(p *peer.Peer) {
			pmes, err := dht.findPeerSingle(p, id, timeout, routeLevel)
			if err != nil {
				u.DErr("getPeer error: %v\n", err)
				return
			}
			plist := pmes.GetPeers()
			if len(plist) == 0 {
				routeLevel++
			}
			for _, fp := range plist {
				nxtp, err := dht.peerFromInfo(fp)
				if err != nil {
					u.DErr("findPeer error: %v\n", err)
					continue
				}

				if nxtp.ID.Equal(dht.self.ID) {
					found <- nxtp
					return
				}
			}
		}(p)
	}

	select {
	case p := <-found:
		return p, nil
	case <-after:
		return nil, u.ErrTimeout
	}
}

394 395 396
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
Jeromy's avatar
Jeromy committed
397
	u.DOut("Enter Ping.\n")
398

399
	pmes := Message{ID: swarm.GenerateMessageID(), Type: PBDHTMessage_PING}
400 401 402
	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	before := time.Now()
403
	responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
404
	dht.netChan.Outgoing <- mes
405 406 407

	tout := time.After(timeout)
	select {
408
	case <-responseChan:
409
		roundtrip := time.Since(before)
410
		p.SetLatency(roundtrip)
411
		u.DOut("Ping took %s.\n", roundtrip.String())
412 413 414
		return nil
	case <-tout:
		// Timed out, think about removing peer from network
415
		u.DOut("[%s] Ping peer [%s] timed out.", dht.self.ID.Pretty(), p.ID.Pretty())
416
		dht.listener.Unlisten(pmes.ID)
417 418 419
		return u.ErrTimeout
	}
}
420

421
func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
422 423
	u.DOut("Begin Diagnostic")
	//Send to N closest peers
424
	targets := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
425 426

	// TODO: Add timeout to this struct so nodes know when to return
427
	pmes := Message{
428
		Type: PBDHTMessage_DIAGNOSTIC,
429
		ID:   swarm.GenerateMessageID(),
430 431
	}

432
	listenChan := dht.listener.Listen(pmes.ID, len(targets), time.Minute*2)
433 434

	pbmes := pmes.ToProtobuf()
435
	for _, p := range targets {
436
		mes := swarm.NewMessage(p, pbmes)
437
		dht.netChan.Outgoing <- mes
438 439 440 441 442 443 444 445 446
	}

	var out []*diagInfo
	after := time.After(timeout)
	for count := len(targets); count > 0; {
		select {
		case <-after:
			u.DOut("Diagnostic request timed out.")
			return out, u.ErrTimeout
Jeromy's avatar
Jeromy committed
447
		case resp := <-listenChan:
448 449
			pmesOut := new(PBDHTMessage)
			err := proto.Unmarshal(resp.Data, pmesOut)
450 451 452 453 454 455
			if err != nil {
				// NOTE: here and elsewhere, need to audit error handling,
				//		some errors should be continued on from
				return out, err
			}

456
			dec := json.NewDecoder(bytes.NewBuffer(pmesOut.GetValue()))
457 458 459 460 461 462 463 464 465 466 467 468
			for {
				di := new(diagInfo)
				err := dec.Decode(di)
				if err != nil {
					break
				}

				out = append(out, di)
			}
		}
	}

469
	return nil, nil
470
}