dht.go 6.79 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"sync"
5
	"time"
6
	"encoding/json"
7

Jeromy's avatar
Jeromy committed
8 9 10
	peer	"github.com/jbenet/go-ipfs/peer"
	swarm	"github.com/jbenet/go-ipfs/swarm"
	u		"github.com/jbenet/go-ipfs/util"
11 12 13
	identify "github.com/jbenet/go-ipfs/identify"

	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
14 15 16

	ds "github.com/jbenet/datastore.go"

17
	"code.google.com/p/goprotobuf/proto"
18 19
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22 23 24
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
25
	routes *RoutingTable
26

27 28
	network *swarm.Swarm

Jeromy's avatar
Jeromy committed
29 30 31 32 33
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
34

35 36 37 38
	// Map keys to peers that can provide their value
	// TODO: implement a TTL on each of these keys
	providers map[u.Key][]*peer.Peer

39 40
	// map of channels waiting for reply messages
	listeners  map[uint64]chan *swarm.Message
41
	listenLock sync.RWMutex
42 43 44

	// Signal to shutdown dht
	shutdown chan struct{}
45 46
}

47 48
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
49 50 51 52 53
	network := swarm.NewSwarm(p)
	err := network.Listen()
	if err != nil {
		return nil,err
	}
54

55 56
	dht := new(IpfsDHT)
	dht.network = network
57 58
	dht.datastore = ds.NewMapDatastore()
	dht.self = p
Jeromy's avatar
Jeromy committed
59 60
	dht.listeners = make(map[uint64]chan *swarm.Message)
	dht.shutdown = make(chan struct{})
61
	dht.routes = NewRoutingTable(20, convertPeerID(p.ID))
62 63 64
	return dht, nil
}

65
// Start up background goroutines needed by the DHT
66 67 68 69
func (dht *IpfsDHT) Start() {
	go dht.handleMessages()
}

70 71 72 73 74 75 76 77 78 79
// Connect to a new peer at the given address
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error {
	peer := new(peer.Peer)
	peer.AddAddress(addr)

	conn,err := swarm.Dial("tcp", peer)
	if err != nil {
		return err
	}

80
	err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
81 82 83 84
	if err != nil {
		return err
	}

85
	dht.network.StartConn(conn)
86

87
	dht.routes.Update(peer)
88
	return nil
Jeromy's avatar
Jeromy committed
89 90
}

91 92
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
93
func (dht *IpfsDHT) handleMessages() {
94
	u.DOut("Begin message handling routine")
95 96 97
	for {
		select {
		case mes := <-dht.network.Chan.Incoming:
98 99
			u.DOut("recieved message from swarm.")

100 101 102 103 104 105 106
			pmes := new(DHTMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("Failed to decode protobuf message: %s", err)
				continue
			}

107 108 109
			// Update peers latest visit in routing table
			dht.routes.Update(mes.Peer)

110
			// Note: not sure if this is the correct place for this
111 112 113 114 115 116 117 118 119 120 121
			if pmes.GetResponse() {
				dht.listenLock.RLock()
				ch, ok := dht.listeners[pmes.GetId()]
				dht.listenLock.RUnlock()
				if ok {
					ch <- mes
				}

				// this is expected behaviour during a timeout
				u.DOut("Received response with nobody listening...")
				continue
122
			}
123 124
			//

125
			u.DOut("Got message type: %d", pmes.GetType())
126
			switch pmes.GetType() {
127 128 129
			case DHTMessage_GET_VALUE:
				dht.handleGetValue(mes.Peer, pmes)
			case DHTMessage_PUT_VALUE:
Jeromy's avatar
Jeromy committed
130
				dht.handlePutValue(mes.Peer, pmes)
131
			case DHTMessage_FIND_NODE:
Jeromy's avatar
Jeromy committed
132
				dht.handleFindNode(mes.Peer, pmes)
133
			case DHTMessage_ADD_PROVIDER:
134 135
			case DHTMessage_GET_PROVIDERS:
			case DHTMessage_PING:
136
				dht.handlePing(mes.Peer, pmes)
137 138
			}

139 140
		case err := <-dht.network.Chan.Errors:
			panic(err)
141 142
		case <-dht.shutdown:
			return
143
		}
144
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145
}
146

147
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
148 149 150
	dskey := ds.NewKey(pmes.GetKey())
	i_val, err := dht.datastore.Get(dskey)
	if err == nil {
151 152 153 154 155 156 157 158 159
		resp := &pDHTMessage{
			Response: true,
			Id: *pmes.Id,
			Key: *pmes.Key,
			Value: i_val.([]byte),
		}

		mes := swarm.NewMessage(p, resp.ToProtobuf())
		dht.network.Chan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
160
	} else if err == ds.ErrNotFound {
161 162 163 164 165 166 167
		// Find closest node(s) to desired key and reply with that info
		// TODO: this will need some other metadata in the protobuf message
		//			to signal to the querying node that the data its receiving
		//			is actually a list of other nodes
	}
}

Jeromy's avatar
Jeromy committed
168
// Store a value in this nodes local storage
169
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
170 171 172 173 174 175
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
176 177 178
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
179 180 181 182 183
	resp := &pDHTMessage{
		Type: pmes.GetType(),
		Response: true,
		Id: pmes.GetId(),
	}
184

185
	dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
186 187
}

188 189 190 191 192
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
	providers := dht.providers[u.Key(pmes.GetKey())]
	if providers == nil || len(providers) == 0 {
		// ?????
	}

	var addrs []string
	for _,prov := range providers {
		ma := prov.NetAddress("tcp")
		str,err := ma.String()
		if err != nil {
			u.PErr("Error: %s", err)
			continue
		}

		addrs = append(addrs, str)
	}

	data,err := json.Marshal(addrs)
	if err != nil {
		panic(err)
	}

	resp := pDHTMessage{
		Type: DHTMessage_GET_PROVIDERS,
		Key: pmes.GetKey(),
		Value: data,
		Id: pmes.GetId(),
	}

	mes := swarm.NewMessage(p, resp.ToProtobuf())
	dht.network.Chan.Outgoing <-mes
224 225 226
}

func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
227 228 229 230
	//TODO: need to implement TTLs on providers
	key := u.Key(pmes.GetKey())
	parr := dht.providers[key]
	dht.providers[key] = append(parr, p)
231 232
}

233

234 235
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
236 237
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
	lchan := make(chan *swarm.Message)
238 239 240 241 242
	dht.listenLock.Lock()
	dht.listeners[mesid] = lchan
	dht.listenLock.Unlock()
	return lchan
}
243

244
// Unregister the given message id from the listener map
Jeromy's avatar
Jeromy committed
245 246 247 248 249 250 251 252 253 254
func (dht *IpfsDHT) Unlisten(mesid uint64) {
	dht.listenLock.Lock()
	ch, ok := dht.listeners[mesid]
	if ok {
		delete(dht.listeners, mesid)
	}
	dht.listenLock.Unlock()
	close(ch)
}

255 256 257 258 259
// Stop all communications from this node and shut down
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
}
260 261

// Ping a node, log the time it took
262
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
263
	// Thoughts: maybe this should accept an ID and do a peer lookup?
264 265
	u.DOut("Enter Ping.")

266 267
	pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
268 269

	before := time.Now()
270
	response_chan := dht.ListenFor(pmes.Id)
271 272 273 274 275 276
	dht.network.Chan.Outgoing <- mes

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
277 278
		u.POut("Ping took %s.", roundtrip.String())
		return nil
279 280 281
	case <-tout:
		// Timed out, think about removing node from network
		u.DOut("Ping node timed out.")
282
		return u.ErrTimeout
283 284
	}
}