Commit c653a70b authored by Jeromy Johnson's avatar Jeromy Johnson Committed by Juan Batiz-Benet

update messages and add some new code around handling/creating messages

parent fb086a9e
......@@ -2,6 +2,7 @@ package dht
import (
swarm "github.com/jbenet/go-ipfs/swarm"
"sync"
)
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js
......@@ -10,13 +11,42 @@ import (
// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
routes RoutingTable
routes RoutingTable
network *swarm.Swarm
network *swarm.Swarm
listeners map[uint64]chan swarm.Message
listenLock sync.RWMutex
}
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
func (dht *IpfsDHT) handleMessages() {
for mes := range dht.network.Chan.Incoming {
for {
select {
case mes := <-dht.network.Chan.Incoming:
// Unmarshal message
dht.listenLock.RLock()
ch, ok := dht.listeners[id]
dht.listenLock.RUnlock()
if ok {
// Send message to waiting goroutine
ch <- mes
}
//case closeChan: or something
}
}
}
}
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan swarm.Message {
lchan := make(chan swarm.Message)
dht.listenLock.Lock()
dht.listeners[mesid] = lchan
dht.listenLock.Unlock()
return lchan
}
......@@ -6,11 +6,14 @@ message DHTMessage {
enum MessageType {
PUT_VALUE = 0;
GET_VALUE = 1;
PING = 2;
FIND_NODE = 3;
ADD_PROVIDER = 2;
GET_PROVIDERS = 3;
FIND_NODE = 4;
PING = 5;
}
required MessageType type = 1;
optional string key = 2;
optional bytes value = 3;
required int64 id = 4;
}
......@@ -43,14 +43,20 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
mes.Data = []byte(pmes.String())
mes.Peer = p
response_chan := s.network.ListenFor(pmes.Id)
response_chan := s.ListenFor(pmes.Id)
// Wait for either the response or a timeout
timeup := time.After(timeout)
select {
case <-timeup:
// TODO: unregister listener
return nil, timeoutError
case resp := <-response_chan:
return resp.Data, nil
}
// Should never be hit
return nil, nil
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment