dht.go 8.88 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"sync"
5
	"time"
6
	"encoding/json"
7

Jeromy's avatar
Jeromy committed
8 9 10
	peer	"github.com/jbenet/go-ipfs/peer"
	swarm	"github.com/jbenet/go-ipfs/swarm"
	u		"github.com/jbenet/go-ipfs/util"
11 12 13
	identify "github.com/jbenet/go-ipfs/identify"

	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
14 15 16

	ds "github.com/jbenet/datastore.go"

17
	"code.google.com/p/goprotobuf/proto"
18 19
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22 23 24
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
25
	routes *RoutingTable
26

27 28
	network *swarm.Swarm

Jeromy's avatar
Jeromy committed
29 30 31 32 33
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
34

35 36
	// Map keys to peers that can provide their value
	// TODO: implement a TTL on each of these keys
37
	providers map[u.Key][]*providerInfo
38
	providerLock sync.RWMutex
39

40 41
	// map of channels waiting for reply messages
	listeners  map[uint64]chan *swarm.Message
42
	listenLock sync.RWMutex
43 44 45

	// Signal to shutdown dht
	shutdown chan struct{}
46 47 48

	// When this peer started up
	birth time.Time
49 50
}

51 52
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
53 54 55
	if p == nil {
		panic("Tried to create new dht with nil peer")
	}
56 57 58 59 60
	network := swarm.NewSwarm(p)
	err := network.Listen()
	if err != nil {
		return nil,err
	}
61

62 63
	dht := new(IpfsDHT)
	dht.network = network
64 65
	dht.datastore = ds.NewMapDatastore()
	dht.self = p
Jeromy's avatar
Jeromy committed
66
	dht.listeners = make(map[uint64]chan *swarm.Message)
67
	dht.providers = make(map[u.Key][]*providerInfo)
Jeromy's avatar
Jeromy committed
68
	dht.shutdown = make(chan struct{})
69
	dht.routes = NewRoutingTable(20, convertPeerID(p.ID))
70
	dht.birth = time.Now()
71 72 73
	return dht, nil
}

74
// Start up background goroutines needed by the DHT
75 76 77 78
func (dht *IpfsDHT) Start() {
	go dht.handleMessages()
}

79
// Connect to a new peer at the given address
Jeromy's avatar
Jeromy committed
80
// TODO: move this into swarm
81 82 83 84
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
	if addr == nil {
		panic("addr was nil!")
	}
85 86 87 88 89
	peer := new(peer.Peer)
	peer.AddAddress(addr)

	conn,err := swarm.Dial("tcp", peer)
	if err != nil {
90
		return nil, err
91 92
	}

93
	err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
94
	if err != nil {
95
		return nil, err
96 97
	}

Jeromy's avatar
Jeromy committed
98 99 100 101 102 103 104 105 106
	// Send node an address that you can be reached on
	myaddr := dht.self.NetAddress("tcp")
	mastr,err := myaddr.String()
	if err != nil {
		panic("No local address to send")
	}

	conn.Outgoing.MsgChan <- []byte(mastr)

107
	dht.network.StartConn(conn)
108

Jeromy's avatar
Jeromy committed
109 110 111 112
	removed := dht.routes.Update(peer)
	if removed != nil {
		panic("need to remove this peer.")
	}
Jeromy's avatar
Jeromy committed
113 114 115 116 117 118 119 120

	// Ping new peer to register in their routing table
	// NOTE: this should be done better...
	err = dht.Ping(peer, time.Second * 2)
	if err != nil {
		panic("Failed to ping new peer.")
	}

121
	return peer, nil
Jeromy's avatar
Jeromy committed
122 123
}

124 125
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
126
func (dht *IpfsDHT) handleMessages() {
127
	u.DOut("Begin message handling routine")
128 129

	checkTimeouts := time.NewTicker(time.Minute * 5)
130 131
	for {
		select {
132 133 134 135 136
		case mes,ok := <-dht.network.Chan.Incoming:
			if !ok {
				u.DOut("handleMessages closing, bad recv on incoming")
				return
			}
137 138 139 140 141 142 143
			pmes := new(DHTMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("Failed to decode protobuf message: %s", err)
				continue
			}

144
			// Update peers latest visit in routing table
Jeromy's avatar
Jeromy committed
145 146 147 148
			removed := dht.routes.Update(mes.Peer)
			if removed != nil {
				panic("Need to handle removed peer.")
			}
149

150
			// Note: not sure if this is the correct place for this
151 152 153 154 155 156
			if pmes.GetResponse() {
				dht.listenLock.RLock()
				ch, ok := dht.listeners[pmes.GetId()]
				dht.listenLock.RUnlock()
				if ok {
					ch <- mes
157 158 159
				} else {
					// this is expected behaviour during a timeout
					u.DOut("Received response with nobody listening...")
160 161 162
				}

				continue
163
			}
164 165
			//

166 167 168 169
			u.DOut("[peer: %s]", dht.self.ID.Pretty())
			u.DOut("Got message type: '%s' [id = %x, from = %s]",
				DHTMessage_MessageType_name[int32(pmes.GetType())],
				pmes.GetId(), mes.Peer.ID.Pretty())
170
			switch pmes.GetType() {
171 172 173
			case DHTMessage_GET_VALUE:
				dht.handleGetValue(mes.Peer, pmes)
			case DHTMessage_PUT_VALUE:
Jeromy's avatar
Jeromy committed
174
				dht.handlePutValue(mes.Peer, pmes)
175
			case DHTMessage_FIND_NODE:
Jeromy's avatar
Jeromy committed
176
				dht.handleFindPeer(mes.Peer, pmes)
177
			case DHTMessage_ADD_PROVIDER:
178
				dht.handleAddProvider(mes.Peer, pmes)
179
			case DHTMessage_GET_PROVIDERS:
180
				dht.handleGetProviders(mes.Peer, pmes)
181
			case DHTMessage_PING:
182
				dht.handlePing(mes.Peer, pmes)
183 184
			case DHTMessage_DIAGNOSTIC:
				// TODO: network diagnostic messages
185 186
			}

187
		case err := <-dht.network.Chan.Errors:
188
			u.DErr("dht err: %s", err)
189
		case <-dht.shutdown:
190
			checkTimeouts.Stop()
191
			return
192 193 194 195 196 197 198 199 200 201 202 203
		case <-checkTimeouts.C:
			dht.providerLock.Lock()
			for k,parr := range dht.providers {
				var cleaned []*providerInfo
				for _,v := range parr {
					if time.Since(v.Creation) < time.Hour {
						cleaned = append(cleaned, v)
					}
				}
				dht.providers[k] = cleaned
			}
			dht.providerLock.Unlock()
204
		}
205
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206
}
207

208
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
209
	dskey := ds.NewKey(pmes.GetKey())
210
	var resp *pDHTMessage
Jeromy's avatar
Jeromy committed
211 212
	i_val, err := dht.datastore.Get(dskey)
	if err == nil {
213
		resp = &pDHTMessage{
214 215 216 217
			Response: true,
			Id: *pmes.Id,
			Key: *pmes.Key,
			Value: i_val.([]byte),
218
			Success: true,
219
		}
Jeromy's avatar
Jeromy committed
220
	} else if err == ds.ErrNotFound {
Jeromy's avatar
Jeromy committed
221
		// Find closest peer(s) to desired key and reply with that info
222 223 224 225 226 227 228 229
		closer := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey())))
		resp = &pDHTMessage{
			Response: true,
			Id: *pmes.Id,
			Key: *pmes.Key,
			Value: closer.ID,
			Success: false,
		}
230
	}
231 232 233

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <- mes
234 235
}

Jeromy's avatar
Jeromy committed
236
// Store a value in this peer local storage
237
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
238 239 240 241 242 243
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
244 245 246
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
247
	resp := pDHTMessage{
248 249 250 251
		Type: pmes.GetType(),
		Response: true,
		Id: pmes.GetId(),
	}
252

253
	dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
254 255
}

Jeromy's avatar
Jeromy committed
256
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
257
	u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
Jeromy's avatar
Jeromy committed
258 259
	closest := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey())))
	if closest == nil {
Jeromy's avatar
Jeromy committed
260
		panic("could not find anything.")
Jeromy's avatar
Jeromy committed
261 262 263 264 265 266
	}

	if len(closest.Addresses) == 0 {
		panic("no addresses for connected peer...")
	}

Jeromy's avatar
Jeromy committed
267 268
	u.POut("handleFindPeer: sending back '%s'", closest.ID.Pretty())

Jeromy's avatar
Jeromy committed
269 270 271 272 273 274 275 276 277 278 279 280 281 282
	addr,err := closest.Addresses[0].String()
	if err != nil {
		panic(err)
	}

	resp := pDHTMessage{
		Type: pmes.GetType(),
		Response: true,
		Id: pmes.GetId(),
		Value: []byte(addr),
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
283 284 285
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
286
	dht.providerLock.RLock()
287
	providers := dht.providers[u.Key(pmes.GetKey())]
288
	dht.providerLock.RUnlock()
289 290
	if providers == nil || len(providers) == 0 {
		// ?????
291
		u.DOut("No known providers for requested key.")
292 293
	}

294
	// This is just a quick hack, formalize method of sending addrs later
295
	addrs := make(map[u.Key]string)
296
	for _,prov := range providers {
297
		ma := prov.Value.NetAddress("tcp")
298 299 300 301 302 303
		str,err := ma.String()
		if err != nil {
			u.PErr("Error: %s", err)
			continue
		}

304
		addrs[prov.Value.Key()] = str
305 306 307 308 309 310 311 312 313 314 315 316
	}

	data,err := json.Marshal(addrs)
	if err != nil {
		panic(err)
	}

	resp := pDHTMessage{
		Type: DHTMessage_GET_PROVIDERS,
		Key: pmes.GetKey(),
		Value: data,
		Id: pmes.GetId(),
317
		Response: true,
318 319 320 321
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
322 323
}

324 325 326 327 328
type providerInfo struct {
	Creation time.Time
	Value *peer.Peer
}

329
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
330 331
	//TODO: need to implement TTLs on providers
	key := u.Key(pmes.GetKey())
332
	dht.addProviderEntry(key, p)
333 334
}

335

336 337
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
338 339
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
	lchan := make(chan *swarm.Message)
340 341 342 343 344
	dht.listenLock.Lock()
	dht.listeners[mesid] = lchan
	dht.listenLock.Unlock()
	return lchan
}
345

346
// Unregister the given message id from the listener map
Jeromy's avatar
Jeromy committed
347 348 349 350 351 352 353 354 355 356
func (dht *IpfsDHT) Unlisten(mesid uint64) {
	dht.listenLock.Lock()
	ch, ok := dht.listeners[mesid]
	if ok {
		delete(dht.listeners, mesid)
	}
	dht.listenLock.Unlock()
	close(ch)
}

Jeromy's avatar
Jeromy committed
357
// Stop all communications from this peer and shut down
358 359 360 361
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
}
362

363 364 365 366
func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
	u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key)
	dht.providerLock.Lock()
	provs := dht.providers[key]
367
	dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
368 369
	dht.providerLock.Unlock()
}