dht.go 10.3 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"sync"
5
	"time"
6
	"bytes"
7
	"encoding/json"
8

Jeromy's avatar
Jeromy committed
9 10 11
	peer	"github.com/jbenet/go-ipfs/peer"
	swarm	"github.com/jbenet/go-ipfs/swarm"
	u		"github.com/jbenet/go-ipfs/util"
12 13 14
	identify "github.com/jbenet/go-ipfs/identify"

	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
15 16 17

	ds "github.com/jbenet/datastore.go"

18
	"code.google.com/p/goprotobuf/proto"
19 20
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22 23 24 25
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
26
	routes []*RoutingTable
27

28 29
	network *swarm.Swarm

Jeromy's avatar
Jeromy committed
30 31 32 33 34
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
35

36 37
	// Map keys to peers that can provide their value
	// TODO: implement a TTL on each of these keys
38
	providers map[u.Key][]*providerInfo
39
	providerLock sync.RWMutex
40

41
	// map of channels waiting for reply messages
42
	listeners  map[uint64]*listenInfo
43
	listenLock sync.RWMutex
44 45 46

	// Signal to shutdown dht
	shutdown chan struct{}
47 48 49

	// When this peer started up
	birth time.Time
50 51 52 53 54 55 56 57

	//lock to make diagnostics work better
	diaglock sync.Mutex
}

type listenInfo struct {
	resp chan *swarm.Message
	count int
58 59
}

60 61
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
62 63 64
	if p == nil {
		panic("Tried to create new dht with nil peer")
	}
65 66 67 68 69
	network := swarm.NewSwarm(p)
	err := network.Listen()
	if err != nil {
		return nil,err
	}
70

71 72
	dht := new(IpfsDHT)
	dht.network = network
73 74
	dht.datastore = ds.NewMapDatastore()
	dht.self = p
75
	dht.listeners = make(map[uint64]*listenInfo)
76
	dht.providers = make(map[u.Key][]*providerInfo)
Jeromy's avatar
Jeromy committed
77
	dht.shutdown = make(chan struct{})
78 79
	dht.routes = make([]*RoutingTable, 1)
	dht.routes[0] = NewRoutingTable(20, convertPeerID(p.ID))
80
	dht.birth = time.Now()
81 82 83
	return dht, nil
}

84
// Start up background goroutines needed by the DHT
85 86 87 88
func (dht *IpfsDHT) Start() {
	go dht.handleMessages()
}

89
// Connect to a new peer at the given address
Jeromy's avatar
Jeromy committed
90
// TODO: move this into swarm
91 92 93 94
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
	if addr == nil {
		panic("addr was nil!")
	}
95 96 97 98 99
	peer := new(peer.Peer)
	peer.AddAddress(addr)

	conn,err := swarm.Dial("tcp", peer)
	if err != nil {
100
		return nil, err
101 102
	}

103
	err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
104
	if err != nil {
105
		return nil, err
106 107
	}

Jeromy's avatar
Jeromy committed
108 109 110 111 112 113 114 115 116
	// Send node an address that you can be reached on
	myaddr := dht.self.NetAddress("tcp")
	mastr,err := myaddr.String()
	if err != nil {
		panic("No local address to send")
	}

	conn.Outgoing.MsgChan <- []byte(mastr)

117
	dht.network.StartConn(conn)
118

119
	removed := dht.routes[0].Update(peer)
Jeromy's avatar
Jeromy committed
120 121 122
	if removed != nil {
		panic("need to remove this peer.")
	}
Jeromy's avatar
Jeromy committed
123 124 125 126 127 128 129 130

	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
	err = dht.Ping(peer, time.Second * 2)
	if err != nil {
		panic("Failed to ping new peer.")
	}

131
	return peer, nil
Jeromy's avatar
Jeromy committed
132 133
}

134 135
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
136
func (dht *IpfsDHT) handleMessages() {
137
	u.DOut("Begin message handling routine")
138 139

	checkTimeouts := time.NewTicker(time.Minute * 5)
140 141
	for {
		select {
142 143 144 145 146
		case mes,ok := <-dht.network.Chan.Incoming:
			if !ok {
				u.DOut("handleMessages closing, bad recv on incoming")
				return
			}
147 148 149 150 151 152 153
			pmes := new(DHTMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("Failed to decode protobuf message: %s", err)
				continue
			}

154
			// Update peers latest visit in routing table
155
			removed := dht.routes[0].Update(mes.Peer)
Jeromy's avatar
Jeromy committed
156 157 158
			if removed != nil {
				panic("Need to handle removed peer.")
			}
159

160
			// Note: not sure if this is the correct place for this
161 162
			if pmes.GetResponse() {
				dht.listenLock.RLock()
163 164 165 166 167 168
				list, ok := dht.listeners[pmes.GetId()]
				if list.count > 1 {
					list.count--
				} else if list.count == 1 {
					delete(dht.listeners, pmes.GetId())
				}
169 170
				dht.listenLock.RUnlock()
				if ok {
171
					list.resp <- mes
172 173 174
				} else {
					// this is expected behaviour during a timeout
					u.DOut("Received response with nobody listening...")
175 176 177
				}

				continue
178
			}
179 180
			//

181 182 183 184
			u.DOut("[peer: %s]", dht.self.ID.Pretty())
			u.DOut("Got message type: '%s' [id = %x, from = %s]",
				DHTMessage_MessageType_name[int32(pmes.GetType())],
				pmes.GetId(), mes.Peer.ID.Pretty())
185
			switch pmes.GetType() {
186 187 188
			case DHTMessage_GET_VALUE:
				dht.handleGetValue(mes.Peer, pmes)
			case DHTMessage_PUT_VALUE:
Jeromy's avatar
Jeromy committed
189
				dht.handlePutValue(mes.Peer, pmes)
190
			case DHTMessage_FIND_NODE:
Jeromy's avatar
Jeromy committed
191
				dht.handleFindPeer(mes.Peer, pmes)
192
			case DHTMessage_ADD_PROVIDER:
193
				dht.handleAddProvider(mes.Peer, pmes)
194
			case DHTMessage_GET_PROVIDERS:
195
				dht.handleGetProviders(mes.Peer, pmes)
196
			case DHTMessage_PING:
197
				dht.handlePing(mes.Peer, pmes)
198
			case DHTMessage_DIAGNOSTIC:
199
				dht.handleDiagnostic(mes.Peer, pmes)
200 201
			}

202
		case err := <-dht.network.Chan.Errors:
203
			u.DErr("dht err: %s", err)
Jeromy's avatar
Jeromy committed
204
			panic(err)
205
		case <-dht.shutdown:
206
			checkTimeouts.Stop()
207
			return
208 209 210 211 212 213 214 215 216 217 218 219
		case <-checkTimeouts.C:
			dht.providerLock.Lock()
			for k,parr := range dht.providers {
				var cleaned []*providerInfo
				for _,v := range parr {
					if time.Since(v.Creation) < time.Hour {
						cleaned = append(cleaned, v)
					}
				}
				dht.providers[k] = cleaned
			}
			dht.providerLock.Unlock()
220
		}
221
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
222
}
223

224
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
225
	dskey := ds.NewKey(pmes.GetKey())
226
	var resp *pDHTMessage
Jeromy's avatar
Jeromy committed
227 228
	i_val, err := dht.datastore.Get(dskey)
	if err == nil {
229
		resp = &pDHTMessage{
230 231 232 233
			Response: true,
			Id: *pmes.Id,
			Key: *pmes.Key,
			Value: i_val.([]byte),
234
			Success: true,
235
		}
Jeromy's avatar
Jeromy committed
236
	} else if err == ds.ErrNotFound {
Jeromy's avatar
Jeromy committed
237
		// Find closest peer(s) to desired key and reply with that info
238
		closer := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
239 240 241 242 243 244 245
		resp = &pDHTMessage{
			Response: true,
			Id: *pmes.Id,
			Key: *pmes.Key,
			Value: closer.ID,
			Success: false,
		}
246
	}
247 248 249

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <- mes
250 251
}

Jeromy's avatar
Jeromy committed
252
// Store a value in this peer local storage
253
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
254 255 256 257 258 259
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
260 261 262
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
263
	resp := pDHTMessage{
264 265 266 267
		Type: pmes.GetType(),
		Response: true,
		Id: pmes.GetId(),
	}
268

269
	dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
270 271
}

Jeromy's avatar
Jeromy committed
272
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
273
	u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
274
	closest := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
Jeromy's avatar
Jeromy committed
275
	if closest == nil {
Jeromy's avatar
Jeromy committed
276
		panic("could not find anything.")
Jeromy's avatar
Jeromy committed
277 278 279 280 281 282
	}

	if len(closest.Addresses) == 0 {
		panic("no addresses for connected peer...")
	}

Jeromy's avatar
Jeromy committed
283 284
	u.POut("handleFindPeer: sending back '%s'", closest.ID.Pretty())

Jeromy's avatar
Jeromy committed
285 286 287 288 289 290 291 292 293 294 295 296 297 298
	addr,err := closest.Addresses[0].String()
	if err != nil {
		panic(err)
	}

	resp := pDHTMessage{
		Type: pmes.GetType(),
		Response: true,
		Id: pmes.GetId(),
		Value: []byte(addr),
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
299 300 301
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
302
	dht.providerLock.RLock()
303
	providers := dht.providers[u.Key(pmes.GetKey())]
304
	dht.providerLock.RUnlock()
305 306
	if providers == nil || len(providers) == 0 {
		// ?????
307
		u.DOut("No known providers for requested key.")
308 309
	}

310
	// This is just a quick hack, formalize method of sending addrs later
311
	addrs := make(map[u.Key]string)
312
	for _,prov := range providers {
313
		ma := prov.Value.NetAddress("tcp")
314 315 316 317 318 319
		str,err := ma.String()
		if err != nil {
			u.PErr("Error: %s", err)
			continue
		}

320
		addrs[prov.Value.Key()] = str
321 322 323 324 325 326 327 328 329 330 331 332
	}

	data,err := json.Marshal(addrs)
	if err != nil {
		panic(err)
	}

	resp := pDHTMessage{
		Type: DHTMessage_GET_PROVIDERS,
		Key: pmes.GetKey(),
		Value: data,
		Id: pmes.GetId(),
333
		Response: true,
334 335 336 337
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
338 339
}

340 341 342 343 344
type providerInfo struct {
	Creation time.Time
	Value *peer.Peer
}

345
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
346 347
	//TODO: need to implement TTLs on providers
	key := u.Key(pmes.GetKey())
348
	dht.addProviderEntry(key, p)
349 350
}

351

352 353
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
354
func (dht *IpfsDHT) ListenFor(mesid uint64, count int) <-chan *swarm.Message {
355
	lchan := make(chan *swarm.Message)
356
	dht.listenLock.Lock()
357
	dht.listeners[mesid] = &listenInfo{lchan, count}
358 359 360
	dht.listenLock.Unlock()
	return lchan
}
361

362
// Unregister the given message id from the listener map
Jeromy's avatar
Jeromy committed
363 364
func (dht *IpfsDHT) Unlisten(mesid uint64) {
	dht.listenLock.Lock()
365
	list, ok := dht.listeners[mesid]
Jeromy's avatar
Jeromy committed
366 367 368 369
	if ok {
		delete(dht.listeners, mesid)
	}
	dht.listenLock.Unlock()
370 371 372 373 374 375 376 377
	close(list.resp)
}

func (dht *IpfsDHT) IsListening(mesid uint64) bool {
	dht.listenLock.RLock()
	_,ok := dht.listeners[mesid]
	dht.listenLock.RUnlock()
	return ok
Jeromy's avatar
Jeromy committed
378 379
}

Jeromy's avatar
Jeromy committed
380
// Stop all communications from this peer and shut down
381 382 383 384
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
}
385

386 387 388 389
func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
	u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key)
	dht.providerLock.Lock()
	provs := dht.providers[key]
390
	dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
391 392
	dht.providerLock.Unlock()
}
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440

func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
	dht.diaglock.Lock()
	if dht.IsListening(pmes.GetId()) {
		//TODO: ehhh..........
		dht.diaglock.Unlock()
		return
	}
	dht.diaglock.Unlock()

	seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
	listen_chan := dht.ListenFor(pmes.GetId(), len(seq))

	for _,ps := range seq {
		mes := swarm.NewMessage(ps, pmes)
		dht.network.Chan.Outgoing <-mes
	}



	buf := new(bytes.Buffer)
	// NOTE: this shouldnt be a hardcoded value
	after := time.After(time.Second * 20)
	count := len(seq)
	for count > 0 {
		select {
		case <-after:
			//Timeout, return what we have
			goto out
		case req_resp := <-listen_chan:
			buf.Write(req_resp.Data)
			count--
		}
	}

out:
	di := dht.getDiagInfo()
	buf.Write(di.Marshal())
	resp := pDHTMessage{
		Type: DHTMessage_DIAGNOSTIC,
		Id: pmes.GetId(),
		Value: buf.Bytes(),
		Response: true,
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
}