dht.go 12 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"bytes"
5
	"encoding/json"
6 7 8
	"errors"
	"sync"
	"time"
9

10 11 12
	peer "github.com/jbenet/go-ipfs/peer"
	swarm "github.com/jbenet/go-ipfs/swarm"
	u "github.com/jbenet/go-ipfs/util"
13 14

	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
15 16 17

	ds "github.com/jbenet/datastore.go"

18
	"code.google.com/p/goprotobuf/proto"
19 20
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22 23 24 25
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
26 27
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
28
	routes []*RoutingTable
29

30 31
	network *swarm.Swarm

Jeromy's avatar
Jeromy committed
32 33 34 35 36
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
37

38 39
	// Map keys to peers that can provide their value
	// TODO: implement a TTL on each of these keys
40
	providers    map[u.Key][]*providerInfo
41
	providerLock sync.RWMutex
42

43
	// map of channels waiting for reply messages
44
	listeners  map[uint64]*listenInfo
45
	listenLock sync.RWMutex
46 47 48

	// Signal to shutdown dht
	shutdown chan struct{}
49 50 51

	// When this peer started up
	birth time.Time
52 53 54 55 56

	//lock to make diagnostics work better
	diaglock sync.Mutex
}

57
// The listen info struct holds information about a message that is being waited for
58
type listenInfo struct {
59
	// Responses matching the listen ID will be sent through resp
60
	resp chan *swarm.Message
61 62

	// count is the number of responses to listen for
63
	count int
64 65

	// eol is the time at which this listener will expire
66
	eol time.Time
67 68
}

69 70
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
71
	if p == nil {
72
		return nil, errors.New("nil peer passed to NewDHT()")
73
	}
74 75 76
	network := swarm.NewSwarm(p)
	err := network.Listen()
	if err != nil {
77
		return nil, err
78
	}
79

80 81
	dht := new(IpfsDHT)
	dht.network = network
82 83
	dht.datastore = ds.NewMapDatastore()
	dht.self = p
84
	dht.listeners = make(map[uint64]*listenInfo)
85
	dht.providers = make(map[u.Key][]*providerInfo)
Jeromy's avatar
Jeromy committed
86
	dht.shutdown = make(chan struct{})
87 88
	dht.routes = make([]*RoutingTable, 1)
	dht.routes[0] = NewRoutingTable(20, convertPeerID(p.ID))
89
	dht.birth = time.Now()
90 91 92
	return dht, nil
}

93
// Start up background goroutines needed by the DHT
94 95 96 97
func (dht *IpfsDHT) Start() {
	go dht.handleMessages()
}

98
// Connect to a new peer at the given address
99
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
100
	maddrstr, _ := addr.String()
101
	u.DOut("Connect to new peer: %s", maddrstr)
102
	npeer, err := dht.network.Connect(addr)
103
	if err != nil {
104
		return nil, err
105 106
	}

107
	dht.Update(npeer)
Jeromy's avatar
Jeromy committed
108 109 110

	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
111
	err = dht.Ping(npeer, time.Second*2)
Jeromy's avatar
Jeromy committed
112
	if err != nil {
113
		return nil, errors.New("Failed to ping newly connected peer.")
Jeromy's avatar
Jeromy committed
114 115
	}

116
	return npeer, nil
Jeromy's avatar
Jeromy committed
117 118
}

119 120
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
121
func (dht *IpfsDHT) handleMessages() {
122
	u.DOut("Begin message handling routine")
123 124

	checkTimeouts := time.NewTicker(time.Minute * 5)
125 126
	for {
		select {
127
		case mes, ok := <-dht.network.Chan.Incoming:
128 129 130 131
			if !ok {
				u.DOut("handleMessages closing, bad recv on incoming")
				return
			}
132
			pmes := new(PBDHTMessage)
133 134 135 136 137 138
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("Failed to decode protobuf message: %s", err)
				continue
			}

139
			dht.Update(mes.Peer)
140

141
			// Note: not sure if this is the correct place for this
142 143
			if pmes.GetResponse() {
				dht.listenLock.RLock()
144
				list, ok := dht.listeners[pmes.GetId()]
145 146 147 148 149
				dht.listenLock.RUnlock()
				if time.Now().After(list.eol) {
					dht.Unlisten(pmes.GetId())
					ok = false
				}
150 151 152
				if list.count > 1 {
					list.count--
				}
153
				if ok {
154
					list.resp <- mes
155 156 157
					if list.count == 1 {
						dht.Unlisten(pmes.GetId())
					}
158 159
				} else {
					u.DOut("Received response with nobody listening...")
160 161 162
				}

				continue
163
			}
164 165
			//

166 167
			u.DOut("[peer: %s]", dht.self.ID.Pretty())
			u.DOut("Got message type: '%s' [id = %x, from = %s]",
168
				PBDHTMessage_MessageType_name[int32(pmes.GetType())],
169
				pmes.GetId(), mes.Peer.ID.Pretty())
170
			switch pmes.GetType() {
171
			case PBDHTMessage_GET_VALUE:
172
				dht.handleGetValue(mes.Peer, pmes)
173
			case PBDHTMessage_PUT_VALUE:
Jeromy's avatar
Jeromy committed
174
				dht.handlePutValue(mes.Peer, pmes)
175
			case PBDHTMessage_FIND_NODE:
Jeromy's avatar
Jeromy committed
176
				dht.handleFindPeer(mes.Peer, pmes)
177
			case PBDHTMessage_ADD_PROVIDER:
178
				dht.handleAddProvider(mes.Peer, pmes)
179
			case PBDHTMessage_GET_PROVIDERS:
180
				dht.handleGetProviders(mes.Peer, pmes)
181
			case PBDHTMessage_PING:
182
				dht.handlePing(mes.Peer, pmes)
183
			case PBDHTMessage_DIAGNOSTIC:
184
				dht.handleDiagnostic(mes.Peer, pmes)
185 186
			}

187
		case err := <-dht.network.Chan.Errors:
188
			u.DErr("dht err: %s", err)
189
		case <-dht.shutdown:
190
			checkTimeouts.Stop()
191
			return
192
		case <-checkTimeouts.C:
193 194 195 196 197 198 199 200 201 202 203 204 205 206
			// Time to collect some garbage!
			dht.cleanExpiredProviders()
			dht.cleanExpiredListeners()
		}
	}
}

func (dht *IpfsDHT) cleanExpiredProviders() {
	dht.providerLock.Lock()
	for k, parr := range dht.providers {
		var cleaned []*providerInfo
		for _, v := range parr {
			if time.Since(v.Creation) < time.Hour {
				cleaned = append(cleaned, v)
207
			}
208
		}
209
		dht.providers[k] = cleaned
210
	}
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
	dht.providerLock.Unlock()
}

func (dht *IpfsDHT) cleanExpiredListeners() {
	dht.listenLock.Lock()
	var remove []uint64
	now := time.Now()
	for k, v := range dht.listeners {
		if now.After(v.eol) {
			remove = append(remove, k)
		}
	}
	for _, k := range remove {
		delete(dht.listeners, k)
	}
	dht.listenLock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227
}
228

229
func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error {
230 231 232
	pmes := DHTMessage{
		Type:  PBDHTMessage_PUT_VALUE,
		Key:   key,
233
		Value: value,
234
		Id:    GenerateMessageID(),
235 236 237 238 239 240 241
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
	dht.network.Chan.Outgoing <- mes
	return nil
}

242
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
Jeromy's avatar
Jeromy committed
243
	dskey := ds.NewKey(pmes.GetKey())
244
	var resp *DHTMessage
Jeromy's avatar
Jeromy committed
245 246
	i_val, err := dht.datastore.Get(dskey)
	if err == nil {
247
		resp = &DHTMessage{
248
			Response: true,
249 250 251 252
			Id:       *pmes.Id,
			Key:      *pmes.Key,
			Value:    i_val.([]byte),
			Success:  true,
253
		}
Jeromy's avatar
Jeromy committed
254
	} else if err == ds.ErrNotFound {
Jeromy's avatar
Jeromy committed
255
		// Find closest peer(s) to desired key and reply with that info
256
		closer := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
257
		resp = &DHTMessage{
258
			Response: true,
259 260 261 262
			Id:       *pmes.Id,
			Key:      *pmes.Key,
			Value:    closer.ID,
			Success:  false,
263
		}
264
	}
265 266 267

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <- mes
268 269
}

Jeromy's avatar
Jeromy committed
270
// Store a value in this peer local storage
271
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) {
Jeromy's avatar
Jeromy committed
272 273 274 275 276 277
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
278 279
}

280 281 282
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) {
	resp := DHTMessage{
		Type:     pmes.GetType(),
283
		Response: true,
284
		Id:       pmes.GetId(),
285
	}
286

287
	dht.network.Chan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
288 289
}

290 291
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
	success := true
Jeromy's avatar
Jeromy committed
292
	u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
293
	closest := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
Jeromy's avatar
Jeromy committed
294
	if closest == nil {
295 296
		u.PErr("handleFindPeer: could not find anything.")
		success = false
Jeromy's avatar
Jeromy committed
297 298 299
	}

	if len(closest.Addresses) == 0 {
300 301
		u.PErr("handleFindPeer: no addresses for connected peer...")
		success = false
Jeromy's avatar
Jeromy committed
302 303
	}

Jeromy's avatar
Jeromy committed
304 305
	u.POut("handleFindPeer: sending back '%s'", closest.ID.Pretty())

306
	addr, err := closest.Addresses[0].String()
Jeromy's avatar
Jeromy committed
307
	if err != nil {
308 309
		u.PErr(err.Error())
		success = false
Jeromy's avatar
Jeromy committed
310 311
	}

312 313
	resp := DHTMessage{
		Type:     pmes.GetType(),
Jeromy's avatar
Jeromy committed
314
		Response: true,
315 316 317
		Id:       pmes.GetId(),
		Value:    []byte(addr),
		Success:  success,
Jeromy's avatar
Jeromy committed
318 319 320
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
321
	dht.network.Chan.Outgoing <- mes
322 323
}

324
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
325
	dht.providerLock.RLock()
326
	providers := dht.providers[u.Key(pmes.GetKey())]
327
	dht.providerLock.RUnlock()
328 329
	if providers == nil || len(providers) == 0 {
		// ?????
330
		u.DOut("No known providers for requested key.")
331 332
	}

333
	// This is just a quick hack, formalize method of sending addrs later
334
	addrs := make(map[u.Key]string)
335
	for _, prov := range providers {
336
		ma := prov.Value.NetAddress("tcp")
337
		str, err := ma.String()
338 339 340 341 342
		if err != nil {
			u.PErr("Error: %s", err)
			continue
		}

343
		addrs[prov.Value.Key()] = str
344 345
	}

346 347
	success := true
	data, err := json.Marshal(addrs)
348
	if err != nil {
349 350 351
		u.POut("handleGetProviders: error marshalling struct to JSON: %s", err)
		data = nil
		success = false
352 353
	}

354 355 356 357 358
	resp := DHTMessage{
		Type:     PBDHTMessage_GET_PROVIDERS,
		Key:      pmes.GetKey(),
		Value:    data,
		Id:       pmes.GetId(),
359
		Response: true,
360
		Success:  success,
361 362 363
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
364
	dht.network.Chan.Outgoing <- mes
365 366
}

367 368
type providerInfo struct {
	Creation time.Time
369
	Value    *peer.Peer
370 371
}

372
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) {
373 374
	//TODO: need to implement TTLs on providers
	key := u.Key(pmes.GetKey())
375
	dht.addProviderEntry(key, p)
376 377
}

378 379
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
380
func (dht *IpfsDHT) ListenFor(mesid uint64, count int, timeout time.Duration) <-chan *swarm.Message {
381
	lchan := make(chan *swarm.Message)
382
	dht.listenLock.Lock()
383
	dht.listeners[mesid] = &listenInfo{lchan, count, time.Now().Add(timeout)}
384 385 386
	dht.listenLock.Unlock()
	return lchan
}
387

388
// Unregister the given message id from the listener map
Jeromy's avatar
Jeromy committed
389 390
func (dht *IpfsDHT) Unlisten(mesid uint64) {
	dht.listenLock.Lock()
391
	list, ok := dht.listeners[mesid]
Jeromy's avatar
Jeromy committed
392 393 394 395
	if ok {
		delete(dht.listeners, mesid)
	}
	dht.listenLock.Unlock()
396 397 398 399 400
	close(list.resp)
}

func (dht *IpfsDHT) IsListening(mesid uint64) bool {
	dht.listenLock.RLock()
401
	li, ok := dht.listeners[mesid]
402
	dht.listenLock.RUnlock()
403 404 405 406 407 408
	if time.Now().After(li.eol) {
		dht.listenLock.Lock()
		delete(dht.listeners, mesid)
		dht.listenLock.Unlock()
		return false
	}
409
	return ok
Jeromy's avatar
Jeromy committed
410 411
}

Jeromy's avatar
Jeromy committed
412
// Stop all communications from this peer and shut down
413 414 415 416
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
}
417

418 419 420 421
func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
	u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key)
	dht.providerLock.Lock()
	provs := dht.providers[key]
422
	dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
423 424
	dht.providerLock.Unlock()
}
425

426
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
427 428 429 430 431 432 433 434 435
	dht.diaglock.Lock()
	if dht.IsListening(pmes.GetId()) {
		//TODO: ehhh..........
		dht.diaglock.Unlock()
		return
	}
	dht.diaglock.Unlock()

	seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
436
	listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30)
437

438
	for _, ps := range seq {
439
		mes := swarm.NewMessage(ps, pmes)
440
		dht.network.Chan.Outgoing <- mes
441 442 443
	}

	buf := new(bytes.Buffer)
444 445 446
	di := dht.getDiagInfo()
	buf.Write(di.Marshal())

447 448 449 450 451 452 453 454 455
	// NOTE: this shouldnt be a hardcoded value
	after := time.After(time.Second * 20)
	count := len(seq)
	for count > 0 {
		select {
		case <-after:
			//Timeout, return what we have
			goto out
		case req_resp := <-listen_chan:
456
			pmes_out := new(PBDHTMessage)
457 458 459 460 461
			err := proto.Unmarshal(req_resp.Data, pmes_out)
			if err != nil {
				// It broke? eh, whatever, keep going
				continue
			}
462 463 464 465 466 467
			buf.Write(req_resp.Data)
			count--
		}
	}

out:
468 469 470 471
	resp := DHTMessage{
		Type:     PBDHTMessage_DIAGNOSTIC,
		Id:       pmes.GetId(),
		Value:    buf.Bytes(),
472 473 474 475
		Response: true,
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
476
	dht.network.Chan.Outgoing <- mes
477
}
478 479

func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
480
	v, err := dht.datastore.Get(ds.NewKey(string(key)))
481 482 483 484 485 486 487 488 489
	if err != nil {
		return nil, err
	}
	return v.([]byte), nil
}

func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error {
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}
490 491 492 493 494 495 496

func (dht *IpfsDHT) Update(p *peer.Peer) {
	removed := dht.routes[0].Update(p)
	if removed != nil {
		dht.network.Drop(removed)
	}
}