dht.go 7.03 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"sync"
5
	"time"
6
	"encoding/json"
7

Jeromy's avatar
Jeromy committed
8 9 10
	peer	"github.com/jbenet/go-ipfs/peer"
	swarm	"github.com/jbenet/go-ipfs/swarm"
	u		"github.com/jbenet/go-ipfs/util"
11 12 13
	identify "github.com/jbenet/go-ipfs/identify"

	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
14 15 16

	ds "github.com/jbenet/datastore.go"

17
	"code.google.com/p/goprotobuf/proto"
18 19
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22 23 24
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
25
	routes *RoutingTable
26

27 28
	network *swarm.Swarm

Jeromy's avatar
Jeromy committed
29 30 31 32 33
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
34

35 36 37
	// Map keys to peers that can provide their value
	// TODO: implement a TTL on each of these keys
	providers map[u.Key][]*peer.Peer
38
	providerLock sync.RWMutex
39

40 41
	// map of channels waiting for reply messages
	listeners  map[uint64]chan *swarm.Message
42
	listenLock sync.RWMutex
43 44 45

	// Signal to shutdown dht
	shutdown chan struct{}
46 47
}

48 49
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
50 51 52
	if p == nil {
		panic("Tried to create new dht with nil peer")
	}
53 54 55 56 57
	network := swarm.NewSwarm(p)
	err := network.Listen()
	if err != nil {
		return nil,err
	}
58

59 60
	dht := new(IpfsDHT)
	dht.network = network
61 62
	dht.datastore = ds.NewMapDatastore()
	dht.self = p
Jeromy's avatar
Jeromy committed
63 64
	dht.listeners = make(map[uint64]chan *swarm.Message)
	dht.shutdown = make(chan struct{})
65
	dht.routes = NewRoutingTable(20, convertPeerID(p.ID))
66 67 68
	return dht, nil
}

69
// Start up background goroutines needed by the DHT
70 71 72 73
func (dht *IpfsDHT) Start() {
	go dht.handleMessages()
}

74
// Connect to a new peer at the given address
75 76 77 78
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
	if addr == nil {
		panic("addr was nil!")
	}
79 80 81 82 83
	peer := new(peer.Peer)
	peer.AddAddress(addr)

	conn,err := swarm.Dial("tcp", peer)
	if err != nil {
84
		return nil, err
85 86
	}

87
	err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
88
	if err != nil {
89
		return nil, err
90 91
	}

92
	dht.network.StartConn(conn)
93

94
	dht.routes.Update(peer)
95
	return peer, nil
Jeromy's avatar
Jeromy committed
96 97
}

98 99
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
100
func (dht *IpfsDHT) handleMessages() {
101
	u.DOut("Begin message handling routine")
102 103 104
	for {
		select {
		case mes := <-dht.network.Chan.Incoming:
105 106
			u.DOut("recieved message from swarm.")

107 108 109 110 111 112 113
			pmes := new(DHTMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("Failed to decode protobuf message: %s", err)
				continue
			}

114 115 116
			// Update peers latest visit in routing table
			dht.routes.Update(mes.Peer)

117
			// Note: not sure if this is the correct place for this
118 119 120 121 122 123 124 125 126 127 128
			if pmes.GetResponse() {
				dht.listenLock.RLock()
				ch, ok := dht.listeners[pmes.GetId()]
				dht.listenLock.RUnlock()
				if ok {
					ch <- mes
				}

				// this is expected behaviour during a timeout
				u.DOut("Received response with nobody listening...")
				continue
129
			}
130 131
			//

132
			u.DOut("Got message type: %d", pmes.GetType())
133
			switch pmes.GetType() {
134 135 136
			case DHTMessage_GET_VALUE:
				dht.handleGetValue(mes.Peer, pmes)
			case DHTMessage_PUT_VALUE:
Jeromy's avatar
Jeromy committed
137
				dht.handlePutValue(mes.Peer, pmes)
138
			case DHTMessage_FIND_NODE:
Jeromy's avatar
Jeromy committed
139
				dht.handleFindNode(mes.Peer, pmes)
140
			case DHTMessage_ADD_PROVIDER:
141 142
			case DHTMessage_GET_PROVIDERS:
			case DHTMessage_PING:
143
				dht.handlePing(mes.Peer, pmes)
144 145
			}

146 147
		case err := <-dht.network.Chan.Errors:
			panic(err)
148 149
		case <-dht.shutdown:
			return
150
		}
151
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152
}
153

154
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
155 156 157
	dskey := ds.NewKey(pmes.GetKey())
	i_val, err := dht.datastore.Get(dskey)
	if err == nil {
158 159 160 161 162 163 164 165 166
		resp := &pDHTMessage{
			Response: true,
			Id: *pmes.Id,
			Key: *pmes.Key,
			Value: i_val.([]byte),
		}

		mes := swarm.NewMessage(p, resp.ToProtobuf())
		dht.network.Chan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
167
	} else if err == ds.ErrNotFound {
168 169 170 171 172 173 174
		// Find closest node(s) to desired key and reply with that info
		// TODO: this will need some other metadata in the protobuf message
		//			to signal to the querying node that the data its receiving
		//			is actually a list of other nodes
	}
}

Jeromy's avatar
Jeromy committed
175
// Store a value in this nodes local storage
176
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
177 178 179 180 181 182
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
183 184 185
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
186 187 188 189 190
	resp := &pDHTMessage{
		Type: pmes.GetType(),
		Response: true,
		Id: pmes.GetId(),
	}
191

192
	dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
193 194
}

195 196 197 198 199
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
200 201 202 203 204
	providers := dht.providers[u.Key(pmes.GetKey())]
	if providers == nil || len(providers) == 0 {
		// ?????
	}

205
	// This is just a quick hack, formalize method of sending addrs later
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
	var addrs []string
	for _,prov := range providers {
		ma := prov.NetAddress("tcp")
		str,err := ma.String()
		if err != nil {
			u.PErr("Error: %s", err)
			continue
		}

		addrs = append(addrs, str)
	}

	data,err := json.Marshal(addrs)
	if err != nil {
		panic(err)
	}

	resp := pDHTMessage{
		Type: DHTMessage_GET_PROVIDERS,
		Key: pmes.GetKey(),
		Value: data,
		Id: pmes.GetId(),
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
232 233 234
}

func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
235 236 237 238
	//TODO: need to implement TTLs on providers
	key := u.Key(pmes.GetKey())
	parr := dht.providers[key]
	dht.providers[key] = append(parr, p)
239 240
}

241

242 243
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
244 245
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
	lchan := make(chan *swarm.Message)
246 247 248 249 250
	dht.listenLock.Lock()
	dht.listeners[mesid] = lchan
	dht.listenLock.Unlock()
	return lchan
}
251

252
// Unregister the given message id from the listener map
Jeromy's avatar
Jeromy committed
253 254 255 256 257 258 259 260 261 262
func (dht *IpfsDHT) Unlisten(mesid uint64) {
	dht.listenLock.Lock()
	ch, ok := dht.listeners[mesid]
	if ok {
		delete(dht.listeners, mesid)
	}
	dht.listenLock.Unlock()
	close(ch)
}

263 264 265 266 267
// Stop all communications from this node and shut down
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
}
268 269

// Ping a node, log the time it took
270
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
271
	// Thoughts: maybe this should accept an ID and do a peer lookup?
272 273
	u.DOut("Enter Ping.")

274 275
	pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
276 277

	before := time.Now()
278
	response_chan := dht.ListenFor(pmes.Id)
279 280 281 282 283 284
	dht.network.Chan.Outgoing <- mes

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
285 286
		u.POut("Ping took %s.", roundtrip.String())
		return nil
287 288 289
	case <-tout:
		// Timed out, think about removing node from network
		u.DOut("Ping node timed out.")
290
		return u.ErrTimeout
291 292
	}
}