dht.go 5.82 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package dht

3
import (
4
	"sync"
5
	"time"
6

Jeromy's avatar
Jeromy committed
7 8 9
	peer	"github.com/jbenet/go-ipfs/peer"
	swarm	"github.com/jbenet/go-ipfs/swarm"
	u		"github.com/jbenet/go-ipfs/util"
10 11 12
	identify "github.com/jbenet/go-ipfs/identify"

	ma "github.com/jbenet/go-multiaddr"
Jeromy's avatar
Jeromy committed
13 14 15

	ds "github.com/jbenet/datastore.go"

16
	"code.google.com/p/goprotobuf/proto"
17 18
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20 21 22 23
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
24
	routes RoutingTable
25

26 27
	network *swarm.Swarm

Jeromy's avatar
Jeromy committed
28 29 30 31 32
	// Local peer (yourself)
	self *peer.Peer

	// Local data
	datastore ds.Datastore
33

34 35
	// map of channels waiting for reply messages
	listeners  map[uint64]chan *swarm.Message
36
	listenLock sync.RWMutex
37 38 39

	// Signal to shutdown dht
	shutdown chan struct{}
40 41
}

42 43
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
Jeromy's avatar
Jeromy committed
44
	dht := new(IpfsDHT)
45

Jeromy's avatar
Jeromy committed
46
	dht.network = swarm.NewSwarm(p)
47 48 49 50 51 52
	//TODO: should Listen return an error?
	dht.network.Listen()

	dht.datastore = ds.NewMapDatastore()

	dht.self = p
Jeromy's avatar
Jeromy committed
53 54
	dht.listeners = make(map[uint64]chan *swarm.Message)
	dht.shutdown = make(chan struct{})
55 56 57
	return dht, nil
}

58 59 60 61
func (dht *IpfsDHT) Start() {
	go dht.handleMessages()
}

62 63 64 65 66 67 68 69 70 71
// Connect to a new peer at the given address
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) error {
	peer := new(peer.Peer)
	peer.AddAddress(addr)

	conn,err := swarm.Dial("tcp", peer)
	if err != nil {
		return err
	}

72
	err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
73 74 75 76
	if err != nil {
		return err
	}

77
	dht.network.StartConn(conn)
78 79 80

	// TODO: Add this peer to our routing table
	return nil
Jeromy's avatar
Jeromy committed
81 82
}

83 84
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
85
func (dht *IpfsDHT) handleMessages() {
86
	u.DOut("Being message handling routine")
87 88 89
	for {
		select {
		case mes := <-dht.network.Chan.Incoming:
90 91
			u.DOut("recieved message from swarm.")

92 93 94 95 96 97 98 99
			pmes := new(DHTMessage)
			err := proto.Unmarshal(mes.Data, pmes)
			if err != nil {
				u.PErr("Failed to decode protobuf message: %s", err)
				continue
			}

			// Note: not sure if this is the correct place for this
100 101 102 103 104 105 106 107 108 109 110
			if pmes.GetResponse() {
				dht.listenLock.RLock()
				ch, ok := dht.listeners[pmes.GetId()]
				dht.listenLock.RUnlock()
				if ok {
					ch <- mes
				}

				// this is expected behaviour during a timeout
				u.DOut("Received response with nobody listening...")
				continue
111
			}
112 113 114
			//

			switch pmes.GetType() {
115 116 117
			case DHTMessage_GET_VALUE:
				dht.handleGetValue(mes.Peer, pmes)
			case DHTMessage_PUT_VALUE:
Jeromy's avatar
Jeromy committed
118
				dht.handlePutValue(mes.Peer, pmes)
119
			case DHTMessage_FIND_NODE:
Jeromy's avatar
Jeromy committed
120
				dht.handleFindNode(mes.Peer, pmes)
121
			case DHTMessage_ADD_PROVIDER:
122 123
			case DHTMessage_GET_PROVIDERS:
			case DHTMessage_PING:
Jeromy's avatar
Jeromy committed
124
				dht.handleFindNode(mes.Peer, pmes)
125 126
			}

127 128
		case err := <-dht.network.Chan.Errors:
			panic(err)
129 130
		case <-dht.shutdown:
			return
131
		}
132
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
133
}
134

135
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
136 137 138
	dskey := ds.NewKey(pmes.GetKey())
	i_val, err := dht.datastore.Get(dskey)
	if err == nil {
139 140 141 142 143
		isResponse := true
		resp := new(DHTMessage)
		resp.Response = &isResponse
		resp.Id = pmes.Id
		resp.Key = pmes.Key
Jeromy's avatar
Jeromy committed
144 145

		val := i_val.([]byte)
146
		resp.Value = val
Jeromy's avatar
Jeromy committed
147 148 149 150 151

		mes := new(swarm.Message)
		mes.Peer = p
		mes.Data = []byte(resp.String())
	} else if err == ds.ErrNotFound {
152 153 154 155 156 157 158
		// Find closest node(s) to desired key and reply with that info
		// TODO: this will need some other metadata in the protobuf message
		//			to signal to the querying node that the data its receiving
		//			is actually a list of other nodes
	}
}

Jeromy's avatar
Jeromy committed
159
// Store a value in this nodes local storage
160
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
Jeromy's avatar
Jeromy committed
161 162 163 164 165 166
	dskey := ds.NewKey(pmes.GetKey())
	err := dht.datastore.Put(dskey, pmes.GetValue())
	if err != nil {
		// For now, just panic, handle this better later maybe
		panic(err)
	}
167 168 169 170 171 172 173
}

func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
	isResponse := true
	resp := new(DHTMessage)
	resp.Id = pmes.Id
	resp.Response = &isResponse
174
	resp.Type = pmes.Type
175

176
	dht.network.Chan.Outgoing <-swarm.NewMessage(p, []byte(resp.String()))
177 178
}

179 180 181 182 183 184 185 186 187 188 189 190
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
	panic("Not implemented.")
}

191

192 193
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
194 195
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
	lchan := make(chan *swarm.Message)
196 197 198 199 200
	dht.listenLock.Lock()
	dht.listeners[mesid] = lchan
	dht.listenLock.Unlock()
	return lchan
}
201

Jeromy's avatar
Jeromy committed
202 203 204 205 206 207 208 209 210 211
func (dht *IpfsDHT) Unlisten(mesid uint64) {
	dht.listenLock.Lock()
	ch, ok := dht.listeners[mesid]
	if ok {
		delete(dht.listeners, mesid)
	}
	dht.listenLock.Unlock()
	close(ch)
}

212 213 214 215 216
// Stop all communications from this node and shut down
func (dht *IpfsDHT) Halt() {
	dht.shutdown <- struct{}{}
	dht.network.Close()
}
217 218 219 220

// Ping a node, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) {
	// Thoughts: maybe this should accept an ID and do a peer lookup?
221 222
	u.DOut("Enter Ping.")

223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
	id := GenerateMessageID()
	mes_type := DHTMessage_PING
	pmes := new(DHTMessage)
	pmes.Id = &id
	pmes.Type = &mes_type

	mes := new(swarm.Message)
	mes.Peer = p
	mes.Data = []byte(pmes.String())

	before := time.Now()
	response_chan := dht.ListenFor(id)
	dht.network.Chan.Outgoing <- mes

	tout := time.After(timeout)
	select {
	case <-response_chan:
		roundtrip := time.Since(before)
		u.DOut("Ping took %s.", roundtrip.String())
	case <-tout:
		// Timed out, think about removing node from network
		u.DOut("Ping node timed out.")
	}
}