dht.go 7.61 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"sync"
5
	"time"
6
	"encoding/json"
7

Jeromy's avatar
Jeromy committed
8 9 10
	peer	"github.com/jbenet/go-ipfs/peer"
	swarm	"github.com/jbenet/go-ipfs/swarm"
	u		"github.com/jbenet/go-ipfs/util"
11 12 13
	identify "github.com/jbenet/go-ipfs/identify"

	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
14 15 16

	ds "github.com/jbenet/datastore.go"

17
	"code.google.com/p/goprotobuf/proto"
18 19
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22 23 24
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
25
	routes *RoutingTable
26

27 28
	network *swarm.Swarm

Jeromy's avatar
Jeromy committed
29 30 31 32 33
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
34

35 36 37
	// Map keys to peers that can provide their value
	// TODO: implement a TTL on each of these keys
	providers map[u.Key][]*peer.Peer
38
	providerLock sync.RWMutex
39

40 41
	// map of channels waiting for reply messages
	listeners  map[uint64]chan *swarm.Message
42
	listenLock sync.RWMutex
43 44 45

	// Signal to shutdown dht
	shutdown chan struct{}
46 47
}

48 49
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
50 51 52
	if p == nil {
		panic("Tried to create new dht with nil peer")
	}
53 54 55 56 57
	network := swarm.NewSwarm(p)
	err := network.Listen()
	if err != nil {
		return nil,err
	}
58

59 60
	dht := new(IpfsDHT)
	dht.network = network
61 62
	dht.datastore = ds.NewMapDatastore()
	dht.self = p
Jeromy's avatar
Jeromy committed
63
	dht.listeners = make(map[uint64]chan *swarm.Message)
64
	dht.providers = make(map[u.Key][]*peer.Peer)
Jeromy's avatar
Jeromy committed
65
	dht.shutdown = make(chan struct{})
66
	dht.routes = NewRoutingTable(20, convertPeerID(p.ID))
67 68 69
	return dht, nil
}

70
// Start up background goroutines needed by the DHT
71 72 73 74
func (dht *IpfsDHT) Start() {
	go dht.handleMessages()
}

75
// Connect to a new peer at the given address
76 77 78 79
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
	if addr == nil {
		panic("addr was nil!")
	}
80 81 82 83 84
	peer := new(peer.Peer)
	peer.AddAddress(addr)

	conn,err := swarm.Dial("tcp", peer)
	if err != nil {
85
		return nil, err
86 87
	}

88
	err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
89
	if err != nil {
90
		return nil, err
91 92
	}

93
	dht.network.StartConn(conn)
94

95
	dht.routes.Update(peer)
96
	return peer, nil
Jeromy's avatar
Jeromy committed
97 98
}

99 100
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
101
func (dht *IpfsDHT) handleMessages() {
102
	u.DOut("Begin message handling routine")
103 104
	for {
		select {
105 106 107 108 109
		case mes,ok := <-dht.network.Chan.Incoming:
			if !ok {
				u.DOut("handleMessages closing, bad recv on incoming")
				return
			}
110 111 112 113 114 115 116
			pmes := new(DHTMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("Failed to decode protobuf message: %s", err)
				continue
			}

117 118 119
			// Update peers latest visit in routing table
			dht.routes.Update(mes.Peer)

120
			// Note: not sure if this is the correct place for this
121 122 123 124 125 126
			if pmes.GetResponse() {
				dht.listenLock.RLock()
				ch, ok := dht.listeners[pmes.GetId()]
				dht.listenLock.RUnlock()
				if ok {
					ch <- mes
127 128 129
				} else {
					// this is expected behaviour during a timeout
					u.DOut("Received response with nobody listening...")
130 131 132
				}

				continue
133
			}
134 135
			//

136
			u.DOut("Got message type: '%s' [id = %x]", mesNames[pmes.GetType()], pmes.GetId())
137
			switch pmes.GetType() {
138 139 140
			case DHTMessage_GET_VALUE:
				dht.handleGetValue(mes.Peer, pmes)
			case DHTMessage_PUT_VALUE:
Jeromy's avatar
Jeromy committed
141
				dht.handlePutValue(mes.Peer, pmes)
142
			case DHTMessage_FIND_NODE:
Jeromy's avatar
Jeromy committed
143
				dht.handleFindNode(mes.Peer, pmes)
144
			case DHTMessage_ADD_PROVIDER:
145
				dht.handleAddProvider(mes.Peer, pmes)
146
			case DHTMessage_GET_PROVIDERS:
147
				dht.handleGetProviders(mes.Peer, pmes)
148
			case DHTMessage_PING:
149
				dht.handlePing(mes.Peer, pmes)
150 151
			}

152
		case err := <-dht.network.Chan.Errors:
153
			u.DErr("dht err: %s", err)
154 155
		case <-dht.shutdown:
			return
156
		}
157
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
}
159

160
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
161 162 163
	dskey := ds.NewKey(pmes.GetKey())
	i_val, err := dht.datastore.Get(dskey)
	if err == nil {
164 165 166 167 168 169 170 171 172
		resp := &pDHTMessage{
			Response: true,
			Id: *pmes.Id,
			Key: *pmes.Key,
			Value: i_val.([]byte),
		}

		mes := swarm.NewMessage(p, resp.ToProtobuf())
		dht.network.Chan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
173
	} else if err == ds.ErrNotFound {
174 175 176 177 178 179 180
		// Find closest node(s) to desired key and reply with that info
		// TODO: this will need some other metadata in the protobuf message
		//			to signal to the querying node that the data its receiving
		//			is actually a list of other nodes
	}
}

Jeromy's avatar
Jeromy committed
181
// Store a value in this nodes local storage
182
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
183 184 185 186 187 188
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
189 190 191
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
192 193 194 195 196
	resp := &pDHTMessage{
		Type: pmes.GetType(),
		Response: true,
		Id: pmes.GetId(),
	}
197

198
	dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
199 200
}

201 202 203 204 205
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
206
	dht.providerLock.RLock()
207
	providers := dht.providers[u.Key(pmes.GetKey())]
208
	dht.providerLock.RUnlock()
209 210
	if providers == nil || len(providers) == 0 {
		// ?????
211
		u.DOut("No known providers for requested key.")
212 213
	}

214
	// This is just a quick hack, formalize method of sending addrs later
215
	addrs := make(map[u.Key]string)
216 217 218 219 220 221 222 223
	for _,prov := range providers {
		ma := prov.NetAddress("tcp")
		str,err := ma.String()
		if err != nil {
			u.PErr("Error: %s", err)
			continue
		}

224
		addrs[prov.Key()] = str
225 226 227 228 229 230 231 232 233 234 235 236
	}

	data,err := json.Marshal(addrs)
	if err != nil {
		panic(err)
	}

	resp := pDHTMessage{
		Type: DHTMessage_GET_PROVIDERS,
		Key: pmes.GetKey(),
		Value: data,
		Id: pmes.GetId(),
237
		Response: true,
238 239 240 241
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
242 243 244
}

func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
245 246
	//TODO: need to implement TTLs on providers
	key := u.Key(pmes.GetKey())
247
	dht.addProviderEntry(key, p)
248 249
}

250

251 252
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
253 254
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
	lchan := make(chan *swarm.Message)
255 256 257 258 259
	dht.listenLock.Lock()
	dht.listeners[mesid] = lchan
	dht.listenLock.Unlock()
	return lchan
}
260

261
// Unregister the given message id from the listener map
Jeromy's avatar
Jeromy committed
262 263 264 265 266 267 268 269 270 271
func (dht *IpfsDHT) Unlisten(mesid uint64) {
	dht.listenLock.Lock()
	ch, ok := dht.listeners[mesid]
	if ok {
		delete(dht.listeners, mesid)
	}
	dht.listenLock.Unlock()
	close(ch)
}

272 273 274 275 276
// Stop all communications from this node and shut down
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
}
277 278

// Ping a node, log the time it took
279
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
280
	// Thoughts: maybe this should accept an ID and do a peer lookup?
281 282
	u.DOut("Enter Ping.")

283 284
	pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
285 286

	before := time.Now()
287
	response_chan := dht.ListenFor(pmes.Id)
288 289 290 291 292 293
	dht.network.Chan.Outgoing <- mes

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
294 295
		u.POut("Ping took %s.", roundtrip.String())
		return nil
296 297 298
	case <-tout:
		// Timed out, think about removing node from network
		u.DOut("Ping node timed out.")
299
		return u.ErrTimeout
300 301
	}
}
302 303 304 305 306 307 308 309

func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
	u.DOut("Adding %s as provider for '%s'", p.Key().Pretty(), key)
	dht.providerLock.Lock()
	provs := dht.providers[key]
	dht.providers[key] = append(provs, p)
	dht.providerLock.Unlock()
}