dht.go 11.8 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"sync"
5
	"time"
6
	"bytes"
7
	"encoding/json"
8

Jeromy's avatar
Jeromy committed
9 10 11
	peer	"github.com/jbenet/go-ipfs/peer"
	swarm	"github.com/jbenet/go-ipfs/swarm"
	u		"github.com/jbenet/go-ipfs/util"
12 13 14
	identify "github.com/jbenet/go-ipfs/identify"

	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
15 16 17

	ds "github.com/jbenet/datastore.go"

18
	"code.google.com/p/goprotobuf/proto"
19 20
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22 23 24 25
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
26 27
	// Array of routing tables for differently distanced nodes
	// NOTE: (currently, only a single table is used)
28
	routes []*RoutingTable
29

30 31
	network *swarm.Swarm

Jeromy's avatar
Jeromy committed
32 33 34 35 36
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
37

38 39
	// Map keys to peers that can provide their value
	// TODO: implement a TTL on each of these keys
40
	providers map[u.Key][]*providerInfo
41
	providerLock sync.RWMutex
42

43
	// map of channels waiting for reply messages
44
	listeners  map[uint64]*listenInfo
45
	listenLock sync.RWMutex
46 47 48

	// Signal to shutdown dht
	shutdown chan struct{}
49 50 51

	// When this peer started up
	birth time.Time
52 53 54 55 56 57 58 59

	//lock to make diagnostics work better
	diaglock sync.Mutex
}

type listenInfo struct {
	resp chan *swarm.Message
	count int
60
	eol time.Time
61 62
}

63 64
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
65 66 67
	if p == nil {
		panic("Tried to create new dht with nil peer")
	}
68 69 70 71 72
	network := swarm.NewSwarm(p)
	err := network.Listen()
	if err != nil {
		return nil,err
	}
73

74 75
	dht := new(IpfsDHT)
	dht.network = network
76 77
	dht.datastore = ds.NewMapDatastore()
	dht.self = p
78
	dht.listeners = make(map[uint64]*listenInfo)
79
	dht.providers = make(map[u.Key][]*providerInfo)
Jeromy's avatar
Jeromy committed
80
	dht.shutdown = make(chan struct{})
81 82
	dht.routes = make([]*RoutingTable, 1)
	dht.routes[0] = NewRoutingTable(20, convertPeerID(p.ID))
83
	dht.birth = time.Now()
84 85 86
	return dht, nil
}

87
// Start up background goroutines needed by the DHT
88 89 90 91
func (dht *IpfsDHT) Start() {
	go dht.handleMessages()
}

92
// Connect to a new peer at the given address
Jeromy's avatar
Jeromy committed
93
// TODO: move this into swarm
94
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
95 96
	maddrstr,_ := addr.String()
	u.DOut("Connect to new peer: %s", maddrstr)
97 98 99
	if addr == nil {
		panic("addr was nil!")
	}
100 101 102 103 104
	peer := new(peer.Peer)
	peer.AddAddress(addr)

	conn,err := swarm.Dial("tcp", peer)
	if err != nil {
105
		return nil, err
106 107
	}

108
	err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
109
	if err != nil {
110
		return nil, err
111 112
	}

Jeromy's avatar
Jeromy committed
113 114 115 116 117 118 119 120 121
	// Send node an address that you can be reached on
	myaddr := dht.self.NetAddress("tcp")
	mastr,err := myaddr.String()
	if err != nil {
		panic("No local address to send")
	}

	conn.Outgoing.MsgChan <- []byte(mastr)

122
	dht.network.StartConn(conn)
123

124
	removed := dht.routes[0].Update(peer)
Jeromy's avatar
Jeromy committed
125 126 127
	if removed != nil {
		panic("need to remove this peer.")
	}
Jeromy's avatar
Jeromy committed
128 129 130 131 132 133 134 135

	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
	err = dht.Ping(peer, time.Second * 2)
	if err != nil {
		panic("Failed to ping new peer.")
	}

136
	return peer, nil
Jeromy's avatar
Jeromy committed
137 138
}

139 140
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
141
func (dht *IpfsDHT) handleMessages() {
142
	u.DOut("Begin message handling routine")
143 144

	checkTimeouts := time.NewTicker(time.Minute * 5)
145 146
	for {
		select {
147 148 149 150 151
		case mes,ok := <-dht.network.Chan.Incoming:
			if !ok {
				u.DOut("handleMessages closing, bad recv on incoming")
				return
			}
152 153 154 155 156 157 158
			pmes := new(DHTMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("Failed to decode protobuf message: %s", err)
				continue
			}

159
			// Update peers latest visit in routing table
160
			removed := dht.routes[0].Update(mes.Peer)
Jeromy's avatar
Jeromy committed
161 162 163
			if removed != nil {
				panic("Need to handle removed peer.")
			}
164

165
			// Note: not sure if this is the correct place for this
166 167
			if pmes.GetResponse() {
				dht.listenLock.RLock()
168
				list, ok := dht.listeners[pmes.GetId()]
169 170 171 172 173
				dht.listenLock.RUnlock()
				if time.Now().After(list.eol) {
					dht.Unlisten(pmes.GetId())
					ok = false
				}
174 175 176
				if list.count > 1 {
					list.count--
				}
177
				if ok {
178
					list.resp <- mes
179 180 181
					if list.count == 1 {
						dht.Unlisten(pmes.GetId())
					}
182 183 184
				} else {
					// this is expected behaviour during a timeout
					u.DOut("Received response with nobody listening...")
185 186 187
				}

				continue
188
			}
189 190
			//

191 192 193 194
			u.DOut("[peer: %s]", dht.self.ID.Pretty())
			u.DOut("Got message type: '%s' [id = %x, from = %s]",
				DHTMessage_MessageType_name[int32(pmes.GetType())],
				pmes.GetId(), mes.Peer.ID.Pretty())
195
			switch pmes.GetType() {
196 197 198
			case DHTMessage_GET_VALUE:
				dht.handleGetValue(mes.Peer, pmes)
			case DHTMessage_PUT_VALUE:
Jeromy's avatar
Jeromy committed
199
				dht.handlePutValue(mes.Peer, pmes)
200
			case DHTMessage_FIND_NODE:
Jeromy's avatar
Jeromy committed
201
				dht.handleFindPeer(mes.Peer, pmes)
202
			case DHTMessage_ADD_PROVIDER:
203
				dht.handleAddProvider(mes.Peer, pmes)
204
			case DHTMessage_GET_PROVIDERS:
205
				dht.handleGetProviders(mes.Peer, pmes)
206
			case DHTMessage_PING:
207
				dht.handlePing(mes.Peer, pmes)
208
			case DHTMessage_DIAGNOSTIC:
209
				dht.handleDiagnostic(mes.Peer, pmes)
210 211
			}

212
		case err := <-dht.network.Chan.Errors:
213
			u.DErr("dht err: %s", err)
Jeromy's avatar
Jeromy committed
214
			panic(err)
215
		case <-dht.shutdown:
216
			checkTimeouts.Stop()
217
			return
218 219 220 221 222 223 224 225 226 227 228 229
		case <-checkTimeouts.C:
			dht.providerLock.Lock()
			for k,parr := range dht.providers {
				var cleaned []*providerInfo
				for _,v := range parr {
					if time.Since(v.Creation) < time.Hour {
						cleaned = append(cleaned, v)
					}
				}
				dht.providers[k] = cleaned
			}
			dht.providerLock.Unlock()
230 231 232 233 234 235 236 237 238 239 240 241
			dht.listenLock.Lock()
			var remove []uint64
			now := time.Now()
			for k,v := range dht.listeners {
				if now.After(v.eol) {
					remove = append(remove, k)
				}
			}
			for _,k := range remove {
				delete(dht.listeners, k)
			}
			dht.listenLock.Unlock()
242
		}
243
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
244
}
245

246 247 248 249 250 251 252 253 254 255 256 257 258
func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error {
	pmes := pDHTMessage{
		Type: DHTMessage_PUT_VALUE,
		Key: key,
		Value: value,
		Id: GenerateMessageID(),
	}

	mes := swarm.NewMessage(p, pmes.ToProtobuf())
	dht.network.Chan.Outgoing <- mes
	return nil
}

259
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
260
	dskey := ds.NewKey(pmes.GetKey())
261
	var resp *pDHTMessage
Jeromy's avatar
Jeromy committed
262 263
	i_val, err := dht.datastore.Get(dskey)
	if err == nil {
264
		resp = &pDHTMessage{
265 266 267 268
			Response: true,
			Id: *pmes.Id,
			Key: *pmes.Key,
			Value: i_val.([]byte),
269
			Success: true,
270
		}
Jeromy's avatar
Jeromy committed
271
	} else if err == ds.ErrNotFound {
Jeromy's avatar
Jeromy committed
272
		// Find closest peer(s) to desired key and reply with that info
273
		closer := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
274 275 276 277 278 279 280
		resp = &pDHTMessage{
			Response: true,
			Id: *pmes.Id,
			Key: *pmes.Key,
			Value: closer.ID,
			Success: false,
		}
281
	}
282 283 284

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <- mes
285 286
}

Jeromy's avatar
Jeromy committed
287
// Store a value in this peer local storage
288
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
289 290 291 292 293 294
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
295 296 297
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
298
	resp := pDHTMessage{
299 300 301 302
		Type: pmes.GetType(),
		Response: true,
		Id: pmes.GetId(),
	}
303

304
	dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
305 306
}

Jeromy's avatar
Jeromy committed
307
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
308
	u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
309
	closest := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
Jeromy's avatar
Jeromy committed
310
	if closest == nil {
Jeromy's avatar
Jeromy committed
311
		panic("could not find anything.")
Jeromy's avatar
Jeromy committed
312 313 314 315 316 317
	}

	if len(closest.Addresses) == 0 {
		panic("no addresses for connected peer...")
	}

Jeromy's avatar
Jeromy committed
318 319
	u.POut("handleFindPeer: sending back '%s'", closest.ID.Pretty())

Jeromy's avatar
Jeromy committed
320 321 322 323 324 325 326 327 328 329 330 331 332 333
	addr,err := closest.Addresses[0].String()
	if err != nil {
		panic(err)
	}

	resp := pDHTMessage{
		Type: pmes.GetType(),
		Response: true,
		Id: pmes.GetId(),
		Value: []byte(addr),
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
334 335 336
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
337
	dht.providerLock.RLock()
338
	providers := dht.providers[u.Key(pmes.GetKey())]
339
	dht.providerLock.RUnlock()
340 341
	if providers == nil || len(providers) == 0 {
		// ?????
342
		u.DOut("No known providers for requested key.")
343 344
	}

345
	// This is just a quick hack, formalize method of sending addrs later
346
	addrs := make(map[u.Key]string)
347
	for _,prov := range providers {
348
		ma := prov.Value.NetAddress("tcp")
349 350 351 352 353 354
		str,err := ma.String()
		if err != nil {
			u.PErr("Error: %s", err)
			continue
		}

355
		addrs[prov.Value.Key()] = str
356 357 358 359 360 361 362 363 364 365 366 367
	}

	data,err := json.Marshal(addrs)
	if err != nil {
		panic(err)
	}

	resp := pDHTMessage{
		Type: DHTMessage_GET_PROVIDERS,
		Key: pmes.GetKey(),
		Value: data,
		Id: pmes.GetId(),
368
		Response: true,
369 370 371 372
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
373 374
}

375 376 377 378 379
type providerInfo struct {
	Creation time.Time
	Value *peer.Peer
}

380
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
381 382
	//TODO: need to implement TTLs on providers
	key := u.Key(pmes.GetKey())
383
	dht.addProviderEntry(key, p)
384 385
}

386

387 388
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
389
func (dht *IpfsDHT) ListenFor(mesid uint64, count int, timeout time.Duration) <-chan *swarm.Message {
390
	lchan := make(chan *swarm.Message)
391
	dht.listenLock.Lock()
392
	dht.listeners[mesid] = &listenInfo{lchan, count, time.Now().Add(timeout)}
393 394 395
	dht.listenLock.Unlock()
	return lchan
}
396

397
// Unregister the given message id from the listener map
Jeromy's avatar
Jeromy committed
398 399
func (dht *IpfsDHT) Unlisten(mesid uint64) {
	dht.listenLock.Lock()
400
	list, ok := dht.listeners[mesid]
Jeromy's avatar
Jeromy committed
401 402 403 404
	if ok {
		delete(dht.listeners, mesid)
	}
	dht.listenLock.Unlock()
405 406 407 408 409
	close(list.resp)
}

func (dht *IpfsDHT) IsListening(mesid uint64) bool {
	dht.listenLock.RLock()
410
	li,ok := dht.listeners[mesid]
411
	dht.listenLock.RUnlock()
412 413 414 415 416 417
	if time.Now().After(li.eol) {
		dht.listenLock.Lock()
		delete(dht.listeners, mesid)
		dht.listenLock.Unlock()
		return false
	}
418
	return ok
Jeromy's avatar
Jeromy committed
419 420
}

Jeromy's avatar
Jeromy committed
421
// Stop all communications from this peer and shut down
422 423 424 425
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
}
426

427 428 429 430
func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
	u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key)
	dht.providerLock.Lock()
	provs := dht.providers[key]
431
	dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
432 433
	dht.providerLock.Unlock()
}
434 435 436 437 438 439 440 441 442 443 444

func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
	dht.diaglock.Lock()
	if dht.IsListening(pmes.GetId()) {
		//TODO: ehhh..........
		dht.diaglock.Unlock()
		return
	}
	dht.diaglock.Unlock()

	seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
445
	listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second * 30)
446 447 448 449 450 451 452 453 454

	for _,ps := range seq {
		mes := swarm.NewMessage(ps, pmes)
		dht.network.Chan.Outgoing <-mes
	}



	buf := new(bytes.Buffer)
455 456 457
	di := dht.getDiagInfo()
	buf.Write(di.Marshal())

458 459 460 461 462 463 464 465 466
	// NOTE: this shouldnt be a hardcoded value
	after := time.After(time.Second * 20)
	count := len(seq)
	for count > 0 {
		select {
		case <-after:
			//Timeout, return what we have
			goto out
		case req_resp := <-listen_chan:
467 468 469 470 471 472
			pmes_out := new(DHTMessage)
			err := proto.Unmarshal(req_resp.Data, pmes_out)
			if err != nil {
				// It broke? eh, whatever, keep going
				continue
			}
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
			buf.Write(req_resp.Data)
			count--
		}
	}

out:
	resp := pDHTMessage{
		Type: DHTMessage_DIAGNOSTIC,
		Id: pmes.GetId(),
		Value: buf.Bytes(),
		Response: true,
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
}
489 490 491 492 493 494 495 496 497 498 499 500

func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
	v,err := dht.datastore.Get(ds.NewKey(string(key)))
	if err != nil {
		return nil, err
	}
	return v.([]byte), nil
}

func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error {
	return dht.datastore.Put(ds.NewKey(string(key)), value)
}