dht.go 5.88 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"sync"
5
	"time"
6

Jeromy's avatar
Jeromy committed
7 8 9
	peer	"github.com/jbenet/go-ipfs/peer"
	swarm	"github.com/jbenet/go-ipfs/swarm"
	u		"github.com/jbenet/go-ipfs/util"
10 11 12
	identify "github.com/jbenet/go-ipfs/identify"

	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
13 14 15

	ds "github.com/jbenet/datastore.go"

16
	"code.google.com/p/goprotobuf/proto"
17 18
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20 21 22 23
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
24
	routes RoutingTable
25

26 27
	network *swarm.Swarm

Jeromy's avatar
Jeromy committed
28 29 30 31 32
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
33

34 35
	// map of channels waiting for reply messages
	listeners  map[uint64]chan *swarm.Message
36
	listenLock sync.RWMutex
37 38 39

	// Signal to shutdown dht
	shutdown chan struct{}
40 41
}

42 43
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
44 45 46 47 48
	network := swarm.NewSwarm(p)
	err := network.Listen()
	if err != nil {
		return nil,err
	}
49

50 51
	dht := new(IpfsDHT)
	dht.network = network
52 53
	dht.datastore = ds.NewMapDatastore()
	dht.self = p
Jeromy's avatar
Jeromy committed
54 55
	dht.listeners = make(map[uint64]chan *swarm.Message)
	dht.shutdown = make(chan struct{})
56 57 58
	return dht, nil
}

59
// Start up background goroutines needed by the DHT
60 61 62 63
func (dht *IpfsDHT) Start() {
	go dht.handleMessages()
}

64 65 66 67 68 69 70 71 72 73
// Connect to a new peer at the given address
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error {
	peer := new(peer.Peer)
	peer.AddAddress(addr)

	conn,err := swarm.Dial("tcp", peer)
	if err != nil {
		return err
	}

74
	err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
75 76 77 78
	if err != nil {
		return err
	}

79
	dht.network.StartConn(conn)
80 81 82

	// TODO: Add this peer to our routing table
	return nil
Jeromy's avatar
Jeromy committed
83 84
}

85 86
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
87
func (dht *IpfsDHT) handleMessages() {
88
	u.DOut("Being message handling routine")
89 90 91
	for {
		select {
		case mes := <-dht.network.Chan.Incoming:
92 93
			u.DOut("recieved message from swarm.")

94 95 96 97 98 99 100 101
			pmes := new(DHTMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("Failed to decode protobuf message: %s", err)
				continue
			}

			// Note: not sure if this is the correct place for this
102 103 104 105 106 107 108 109 110 111 112
			if pmes.GetResponse() {
				dht.listenLock.RLock()
				ch, ok := dht.listeners[pmes.GetId()]
				dht.listenLock.RUnlock()
				if ok {
					ch <- mes
				}

				// this is expected behaviour during a timeout
				u.DOut("Received response with nobody listening...")
				continue
113
			}
114 115
			//

116
			u.DOut("Got message type: %d", pmes.GetType())
117
			switch pmes.GetType() {
118 119 120
			case DHTMessage_GET_VALUE:
				dht.handleGetValue(mes.Peer, pmes)
			case DHTMessage_PUT_VALUE:
Jeromy's avatar
Jeromy committed
121
				dht.handlePutValue(mes.Peer, pmes)
122
			case DHTMessage_FIND_NODE:
Jeromy's avatar
Jeromy committed
123
				dht.handleFindNode(mes.Peer, pmes)
124
			case DHTMessage_ADD_PROVIDER:
125 126
			case DHTMessage_GET_PROVIDERS:
			case DHTMessage_PING:
127
				dht.handlePing(mes.Peer, pmes)
128 129
			}

130 131
		case err := <-dht.network.Chan.Errors:
			panic(err)
132 133
		case <-dht.shutdown:
			return
134
		}
135
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136
}
137

138
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
139 140 141
	dskey := ds.NewKey(pmes.GetKey())
	i_val, err := dht.datastore.Get(dskey)
	if err == nil {
142 143 144 145 146 147 148 149 150
		resp := &pDHTMessage{
			Response: true,
			Id: *pmes.Id,
			Key: *pmes.Key,
			Value: i_val.([]byte),
		}

		mes := swarm.NewMessage(p, resp.ToProtobuf())
		dht.network.Chan.Outgoing <- mes
Jeromy's avatar
Jeromy committed
151
	} else if err == ds.ErrNotFound {
152 153 154 155 156 157 158
		// Find closest node(s) to desired key and reply with that info
		// TODO: this will need some other metadata in the protobuf message
		//			to signal to the querying node that the data its receiving
		//			is actually a list of other nodes
	}
}

Jeromy's avatar
Jeromy committed
159
// Store a value in this nodes local storage
160
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
161 162 163 164 165 166
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
167 168 169
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
170 171 172 173 174
	resp := &pDHTMessage{
		Type: pmes.GetType(),
		Response: true,
		Id: pmes.GetId(),
	}
175

176
	dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
177 178
}

179 180 181 182 183 184 185 186 187 188 189 190
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

191

192 193
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
194 195
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
	lchan := make(chan *swarm.Message)
196 197 198 199 200
	dht.listenLock.Lock()
	dht.listeners[mesid] = lchan
	dht.listenLock.Unlock()
	return lchan
}
201

202
// Unregister the given message id from the listener map
Jeromy's avatar
Jeromy committed
203 204 205 206 207 208 209 210 211 212
func (dht *IpfsDHT) Unlisten(mesid uint64) {
	dht.listenLock.Lock()
	ch, ok := dht.listeners[mesid]
	if ok {
		delete(dht.listeners, mesid)
	}
	dht.listenLock.Unlock()
	close(ch)
}

213 214 215 216 217
// Stop all communications from this node and shut down
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
}
218 219

// Ping a node, log the time it took
220
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
221
	// Thoughts: maybe this should accept an ID and do a peer lookup?
222 223
	u.DOut("Enter Ping.")

224 225
	pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
	mes := swarm.NewMessage(p, pmes.ToProtobuf())
226 227

	before := time.Now()
228
	response_chan := dht.ListenFor(pmes.Id)
229 230 231 232 233 234
	dht.network.Chan.Outgoing <- mes

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
235 236
		u.POut("Ping took %s.", roundtrip.String())
		return nil
237 238 239
	case <-tout:
		// Timed out, think about removing node from network
		u.DOut("Ping node timed out.")
240
		return u.ErrTimeout
241 242
	}
}