routing.go 8.99 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
7
	"math/rand"
8
	"sync"
9
	"time"
10

11 12
	proto "code.google.com/p/goprotobuf/proto"

13 14
	ma "github.com/jbenet/go-multiaddr"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	peer "github.com/jbenet/go-ipfs/peer"
16
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

21 22 23
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6

24 25
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
26 27
	//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
	return uint64(rand.Uint32())
28 29
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30 31 32 33 34
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
35
// This is the top level "Store" operation of the DHT
Jeromy's avatar
Jeromy committed
36 37
func (s *IpfsDHT) PutValue(key u.Key, value []byte) {
	complete := make(chan struct{})
38
	for _, route := range s.routes {
Jeromy's avatar
Jeromy committed
39 40
		p := route.NearestPeer(kb.ConvertKey(key))
		if p == nil {
41
			s.network.Error(kb.ErrLookupFailure)
Jeromy's avatar
Jeromy committed
42 43 44
			go func() {
				complete <- struct{}{}
			}()
45
			continue
Jeromy's avatar
Jeromy committed
46 47 48 49
		}
		go func() {
			err := s.putValueToNetwork(p, string(key), value)
			if err != nil {
50
				s.network.Error(err)
Jeromy's avatar
Jeromy committed
51 52 53 54 55 56
			}
			complete <- struct{}{}
		}()
	}
	for _, _ = range s.routes {
		<-complete
57
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59
}

60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
// A counter for incrementing a variable across multiple threads
type counter struct {
	n   int
	mut sync.RWMutex
}

func (c *counter) Increment() {
	c.mut.Lock()
	c.n++
	c.mut.Unlock()
}

func (c *counter) Decrement() {
	c.mut.Lock()
	c.n--
	c.mut.Unlock()
}

func (c *counter) Size() int {
	c.mut.RLock()
	defer c.mut.RUnlock()
	return c.n
}

84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
type peerSet struct {
	ps map[string]bool
	lk sync.RWMutex
}

func newPeerSet() *peerSet {
	ps := new(peerSet)
	ps.ps = make(map[string]bool)
	return ps
}

func (ps *peerSet) Add(p *peer.Peer) {
	ps.lk.Lock()
	ps.ps[string(p.ID)] = true
	ps.lk.Unlock()
}

func (ps *peerSet) Contains(p *peer.Peer) bool {
	ps.lk.RLock()
	_, ok := ps.ps[string(p.ID)]
	ps.lk.RUnlock()
	return ok
}

func (ps *peerSet) Size() int {
	ps.lk.RLock()
	defer ps.lk.RUnlock()
	return len(ps.ps)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114
// GetValue searches for the value corresponding to given Key.
Jeromy's avatar
Jeromy committed
115 116
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
Jeromy's avatar
Jeromy committed
118 119 120 121 122
	ll := startNewRpc("GET")
	defer func() {
		ll.EndLog()
		ll.Print()
	}()
123

Jeromy's avatar
Jeromy committed
124 125
	// If we have it local, dont bother doing an RPC!
	// NOTE: this might not be what we want to do...
Jeromy's avatar
Jeromy committed
126 127 128 129
	val, err := s.GetLocal(key)
	if err == nil {
		ll.Success = true
		u.DOut("Found local, returning.")
Jeromy's avatar
Jeromy committed
130 131 132
		return val, nil
	}

133 134 135
	route_level := 0
	closest := s.routes[route_level].NearestPeers(kb.ConvertKey(key), PoolSize)
	if closest == nil || len(closest) == 0 {
136 137 138
		return nil, kb.ErrLookupFailure
	}

139 140 141 142 143
	val_chan := make(chan []byte)
	npeer_chan := make(chan *peer.Peer, 30)
	proc_peer := make(chan *peer.Peer, 30)
	err_chan := make(chan error)
	after := time.After(timeout)
144
	pset := newPeerSet()
145

146
	for _, p := range closest {
147
		pset.Add(p)
148 149
		npeer_chan <- p
	}
150

151 152 153 154 155 156 157 158 159 160 161 162
	c := counter{}

	// This limit value is referred to as k in the kademlia paper
	limit := 20
	count := 0
	go func() {
		for {
			select {
			case p := <-npeer_chan:
				count++
				if count >= limit {
					break
Jeromy's avatar
Jeromy committed
163
				}
164
				c.Increment()
165

166 167 168 169
				proc_peer <- p
			default:
				if c.Size() == 0 {
					err_chan <- u.ErrNotFound
170
				}
171 172 173
			}
		}
	}()
174

175 176 177 178 179 180 181 182 183
	process := func() {
		for {
			select {
			case p, ok := <-proc_peer:
				if !ok || p == nil {
					c.Decrement()
					return
				}
				val, peers, err := s.getValueOrPeers(p, key, timeout/4, route_level)
184
				if err != nil {
185 186
					u.DErr(err.Error())
					c.Decrement()
Jeromy's avatar
Jeromy committed
187
					continue
188
				}
189 190 191 192 193 194 195 196
				if val != nil {
					val_chan <- val
					c.Decrement()
					return
				}

				for _, np := range peers {
					// TODO: filter out peers that arent closer
197 198 199 200
					if !pset.Contains(np) && pset.Size() < limit {
						pset.Add(np) //This is racey... make a single function to do operation
						npeer_chan <- np
					}
201 202
				}
				c.Decrement()
203
			}
204
		}
205
	}
206 207 208 209 210 211 212 213 214 215 216 217 218 219

	concurFactor := 3
	for i := 0; i < concurFactor; i++ {
		go process()
	}

	select {
	case val := <-val_chan:
		return val, nil
	case err := <-err_chan:
		return nil, err
	case <-after:
		return nil, u.ErrTimeout
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220 221 222 223 224 225
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

// Announce that this node can provide value for given key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226
func (s *IpfsDHT) Provide(key u.Key) error {
227
	peers := s.routes[0].NearestPeers(kb.ConvertKey(key), PoolSize)
228
	if len(peers) == 0 {
229
		return kb.ErrLookupFailure
230 231
	}

232 233 234
	pmes := DHTMessage{
		Type: PBDHTMessage_ADD_PROVIDER,
		Key:  string(key),
235 236 237
	}
	pbmes := pmes.ToProtobuf()

238
	for _, p := range peers {
239
		mes := swarm.NewMessage(p, pbmes)
240
		s.network.Send(mes)
241 242
	}
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
243 244 245
}

// FindProviders searches for peers who can provide the value for given key.
246
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
Jeromy's avatar
Jeromy committed
247 248 249 250 251 252
	ll := startNewRpc("FindProviders")
	defer func() {
		ll.EndLog()
		ll.Print()
	}()
	u.DOut("Find providers for: '%s'", key)
253
	p := s.routes[0].NearestPeer(kb.ConvertKey(key))
254 255 256
	if p == nil {
		return nil, kb.ErrLookupFailure
	}
257

Jeromy's avatar
Jeromy committed
258 259
	for level := 0; level < len(s.routes); {
		pmes, err := s.findProvidersSingle(p, key, level, timeout)
260 261 262
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
263 264 265 266 267 268 269 270
		if pmes.GetSuccess() {
			provs := s.addPeerList(key, pmes.GetPeers())
			ll.Success = true
			return provs, nil
		} else {
			closer := pmes.GetPeers()
			if len(closer) == 0 {
				level++
Jeromy's avatar
Jeromy committed
271 272
				continue
			}
Jeromy's avatar
Jeromy committed
273 274 275 276 277 278 279 280
			if peer.ID(closer[0].GetId()).Equal(s.self.ID) {
				u.DOut("Got myself back as a closer peer.")
				return nil, u.ErrNotFound
			}
			maddr, err := ma.NewMultiaddr(closer[0].GetAddr())
			if err != nil {
				// ??? Move up route level???
				panic("not yet implemented")
281 282
			}

Jeromy's avatar
Jeromy committed
283 284 285 286 287 288 289 290
			np, err := s.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
			if err != nil {
				u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closer[0].GetAddr())
				level++
				continue
			}
			p = np
		}
291
	}
Jeromy's avatar
Jeromy committed
292
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
293 294 295 296 297 298
}

// Find specific Peer

// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
299 300 301 302 303 304
	// Check if were already connected to them
	p, _ := s.Find(id)
	if p != nil {
		return p, nil
	}

305
	route_level := 0
306
	p = s.routes[route_level].NearestPeer(kb.ConvertPeerID(id))
307 308
	if p == nil {
		return nil, kb.ErrLookupFailure
309
	}
310 311 312
	if p.ID.Equal(id) {
		return p, nil
	}
313

314 315 316 317 318
	for route_level < len(s.routes) {
		pmes, err := s.findPeerSingle(p, id, timeout, route_level)
		plist := pmes.GetPeers()
		if len(plist) == 0 {
			route_level++
319
		}
320 321 322
		found := plist[0]

		addr, err := ma.NewMultiaddr(found.GetAddr())
Jeromy's avatar
Jeromy committed
323
		if err != nil {
324
			return nil, err
Jeromy's avatar
Jeromy committed
325 326
		}

327
		nxtPeer, err := s.network.GetConnection(peer.ID(found.GetId()), addr)
Jeromy's avatar
Jeromy committed
328
		if err != nil {
329
			return nil, err
Jeromy's avatar
Jeromy committed
330
		}
331
		if pmes.GetSuccess() {
332 333 334
			if !id.Equal(nxtPeer.ID) {
				return nil, errors.New("got back invalid peer from 'successful' response")
			}
335 336 337
			return nxtPeer, nil
		} else {
			p = nxtPeer
Jeromy's avatar
Jeromy committed
338
		}
339
	}
340
	return nil, u.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
341
}
342 343 344 345 346 347

// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
	u.DOut("Enter Ping.")

348
	pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING}
349 350 351
	mes := swarm.NewMessage(p, pmes.ToProtobuf())

	before := time.Now()
352
	response_chan := dht.listener.Listen(pmes.Id, 1, time.Minute)
353
	dht.network.Send(mes)
354 355 356 357 358

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
359
		p.SetLatency(roundtrip)
360
		u.DOut("Ping took %s.", roundtrip.String())
361 362 363 364
		return nil
	case <-tout:
		// Timed out, think about removing peer from network
		u.DOut("Ping peer timed out.")
365
		dht.listener.Unlisten(pmes.Id)
366 367 368
		return u.ErrTimeout
	}
}
369 370 371 372

func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
	u.DOut("Begin Diagnostic")
	//Send to N closest peers
373
	targets := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
374 375

	// TODO: Add timeout to this struct so nodes know when to return
376 377 378
	pmes := DHTMessage{
		Type: PBDHTMessage_DIAGNOSTIC,
		Id:   GenerateMessageID(),
379 380
	}

381
	listenChan := dht.listener.Listen(pmes.Id, len(targets), time.Minute*2)
382 383

	pbmes := pmes.ToProtobuf()
384
	for _, p := range targets {
385
		mes := swarm.NewMessage(p, pbmes)
386
		dht.network.Send(mes)
387 388 389 390 391 392 393 394 395
	}

	var out []*diagInfo
	after := time.After(timeout)
	for count := len(targets); count > 0; {
		select {
		case <-after:
			u.DOut("Diagnostic request timed out.")
			return out, u.ErrTimeout
Jeromy's avatar
Jeromy committed
396
		case resp := <-listenChan:
397
			pmes_out := new(PBDHTMessage)
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
			err := proto.Unmarshal(resp.Data, pmes_out)
			if err != nil {
				// NOTE: here and elsewhere, need to audit error handling,
				//		some errors should be continued on from
				return out, err
			}

			dec := json.NewDecoder(bytes.NewBuffer(pmes_out.GetValue()))
			for {
				di := new(diagInfo)
				err := dec.Decode(di)
				if err != nil {
					break
				}

				out = append(out, di)
			}
		}
	}

418
	return nil, nil
419
}