Commit ae556952 authored by Jeromy's avatar Jeromy

address issues from code review (issue #25)

parent 5bd8258e
package dht
// A helper struct to make working with protbuf types easier
type pDHTMessage struct {
Type DHTMessage_MessageType
Key string
Value []byte
type DHTMessage struct {
Type PBDHTMessage_MessageType
Key string
Value []byte
Response bool
Id uint64
Success bool
Id uint64
Success bool
}
func (m *pDHTMessage) ToProtobuf() *DHTMessage {
pmes := new(DHTMessage)
func (m *DHTMessage) ToProtobuf() *PBDHTMessage {
pmes := new(PBDHTMessage)
if m.Value != nil {
pmes.Value = m.Value
}
......
......@@ -49,7 +49,7 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket {
e := bucket_list.Front()
for e != nil {
peer_id := convertPeerID(e.Value.(*peer.Peer).ID)
peer_cpl := xor(peer_id, target).commonPrefixLen()
peer_cpl := prefLen(peer_id, target)
if peer_cpl > cpl {
cur := e
out.PushBack(e.Value)
......
package dht
import (
"sync"
"time"
"bytes"
"encoding/json"
"errors"
"sync"
"time"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
identify "github.com/jbenet/go-ipfs/identify"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
......@@ -37,7 +37,7 @@ type IpfsDHT struct {
// Map keys to peers that can provide their value
// TODO: implement a TTL on each of these keys
providers map[u.Key][]*providerInfo
providers map[u.Key][]*providerInfo
providerLock sync.RWMutex
// map of channels waiting for reply messages
......@@ -54,21 +54,27 @@ type IpfsDHT struct {
diaglock sync.Mutex
}
// The listen info struct holds information about a message that is being waited for
type listenInfo struct {
// Responses matching the listen ID will be sent through resp
resp chan *swarm.Message
// count is the number of responses to listen for
count int
// eol is the time at which this listener will expire
eol time.Time
}
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
if p == nil {
panic("Tried to create new dht with nil peer")
return nil, errors.New("nil peer passed to NewDHT()")
}
network := swarm.NewSwarm(p)
err := network.Listen()
if err != nil {
return nil,err
return nil, err
}
dht := new(IpfsDHT)
......@@ -90,50 +96,24 @@ func (dht *IpfsDHT) Start() {
}
// Connect to a new peer at the given address
// TODO: move this into swarm
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
maddrstr,_ := addr.String()
maddrstr, _ := addr.String()
u.DOut("Connect to new peer: %s", maddrstr)
if addr == nil {
panic("addr was nil!")
}
peer := new(peer.Peer)
peer.AddAddress(addr)
conn,err := swarm.Dial("tcp", peer)
if err != nil {
return nil, err
}
err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
npeer, err := dht.network.Connect(addr)
if err != nil {
return nil, err
}
// Send node an address that you can be reached on
myaddr := dht.self.NetAddress("tcp")
mastr,err := myaddr.String()
if err != nil {
panic("No local address to send")
}
conn.Outgoing.MsgChan <- []byte(mastr)
dht.network.StartConn(conn)
removed := dht.routes[0].Update(peer)
if removed != nil {
panic("need to remove this peer.")
}
dht.Update(npeer)
// Ping new peer to register in their routing table
// NOTE: this should be done better...
err = dht.Ping(peer, time.Second * 2)
err = dht.Ping(npeer, time.Second*2)
if err != nil {
panic("Failed to ping new peer.")
return nil, errors.New("Failed to ping newly connected peer.")
}
return peer, nil
return npeer, nil
}
// Read in all messages from swarm and handle them appropriately
......@@ -144,23 +124,19 @@ func (dht *IpfsDHT) handleMessages() {
checkTimeouts := time.NewTicker(time.Minute * 5)
for {
select {
case mes,ok := <-dht.network.Chan.Incoming:
case mes, ok := <-dht.network.Chan.Incoming:
if !ok {
u.DOut("handleMessages closing, bad recv on incoming")
return
}
pmes := new(DHTMessage)
pmes := new(PBDHTMessage)
err := proto.Unmarshal(mes.Data, pmes)
if err != nil {
u.PErr("Failed to decode protobuf message: %s", err)
continue
}
// Update peers latest visit in routing table
removed := dht.routes[0].Update(mes.Peer)
if removed != nil {
panic("Need to handle removed peer.")
}
dht.Update(mes.Peer)
// Note: not sure if this is the correct place for this
if pmes.GetResponse() {
......@@ -180,7 +156,6 @@ func (dht *IpfsDHT) handleMessages() {
dht.Unlisten(pmes.GetId())
}
} else {
// this is expected behaviour during a timeout
u.DOut("Received response with nobody listening...")
}
......@@ -190,65 +165,73 @@ func (dht *IpfsDHT) handleMessages() {
u.DOut("[peer: %s]", dht.self.ID.Pretty())
u.DOut("Got message type: '%s' [id = %x, from = %s]",
DHTMessage_MessageType_name[int32(pmes.GetType())],
PBDHTMessage_MessageType_name[int32(pmes.GetType())],
pmes.GetId(), mes.Peer.ID.Pretty())
switch pmes.GetType() {
case DHTMessage_GET_VALUE:
case PBDHTMessage_GET_VALUE:
dht.handleGetValue(mes.Peer, pmes)
case DHTMessage_PUT_VALUE:
case PBDHTMessage_PUT_VALUE:
dht.handlePutValue(mes.Peer, pmes)
case DHTMessage_FIND_NODE:
case PBDHTMessage_FIND_NODE:
dht.handleFindPeer(mes.Peer, pmes)
case DHTMessage_ADD_PROVIDER:
case PBDHTMessage_ADD_PROVIDER:
dht.handleAddProvider(mes.Peer, pmes)
case DHTMessage_GET_PROVIDERS:
case PBDHTMessage_GET_PROVIDERS:
dht.handleGetProviders(mes.Peer, pmes)
case DHTMessage_PING:
case PBDHTMessage_PING:
dht.handlePing(mes.Peer, pmes)
case DHTMessage_DIAGNOSTIC:
case PBDHTMessage_DIAGNOSTIC:
dht.handleDiagnostic(mes.Peer, pmes)
}
case err := <-dht.network.Chan.Errors:
u.DErr("dht err: %s", err)
panic(err)
case <-dht.shutdown:
checkTimeouts.Stop()
return
case <-checkTimeouts.C:
dht.providerLock.Lock()
for k,parr := range dht.providers {
var cleaned []*providerInfo
for _,v := range parr {
if time.Since(v.Creation) < time.Hour {
cleaned = append(cleaned, v)
}
}
dht.providers[k] = cleaned
}
dht.providerLock.Unlock()
dht.listenLock.Lock()
var remove []uint64
now := time.Now()
for k,v := range dht.listeners {
if now.After(v.eol) {
remove = append(remove, k)
}
}
for _,k := range remove {
delete(dht.listeners, k)
// Time to collect some garbage!
dht.cleanExpiredProviders()
dht.cleanExpiredListeners()
}
}
}
func (dht *IpfsDHT) cleanExpiredProviders() {
dht.providerLock.Lock()
for k, parr := range dht.providers {
var cleaned []*providerInfo
for _, v := range parr {
if time.Since(v.Creation) < time.Hour {
cleaned = append(cleaned, v)
}
dht.listenLock.Unlock()
}
dht.providers[k] = cleaned
}
dht.providerLock.Unlock()
}
func (dht *IpfsDHT) cleanExpiredListeners() {
dht.listenLock.Lock()
var remove []uint64
now := time.Now()
for k, v := range dht.listeners {
if now.After(v.eol) {
remove = append(remove, k)
}
}
for _, k := range remove {
delete(dht.listeners, k)
}
dht.listenLock.Unlock()
}
func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error {
pmes := pDHTMessage{
Type: DHTMessage_PUT_VALUE,
Key: key,
pmes := DHTMessage{
Type: PBDHTMessage_PUT_VALUE,
Key: key,
Value: value,
Id: GenerateMessageID(),
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
......@@ -256,27 +239,27 @@ func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error
return nil
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
var resp *pDHTMessage
var resp *DHTMessage
i_val, err := dht.datastore.Get(dskey)
if err == nil {
resp = &pDHTMessage{
resp = &DHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: i_val.([]byte),
Success: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: i_val.([]byte),
Success: true,
}
} else if err == ds.ErrNotFound {
// Find closest peer(s) to desired key and reply with that info
closer := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
resp = &pDHTMessage{
resp = &DHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: closer.ID,
Success: false,
Id: *pmes.Id,
Key: *pmes.Key,
Value: closer.ID,
Success: false,
}
}
......@@ -285,7 +268,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
}
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
err := dht.datastore.Put(dskey, pmes.GetValue())
if err != nil {
......@@ -294,46 +277,51 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
}
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
resp := pDHTMessage{
Type: pmes.GetType(),
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) {
resp := DHTMessage{
Type: pmes.GetType(),
Response: true,
Id: pmes.GetId(),
Id: pmes.GetId(),
}
dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
}
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
success := true
u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
closest := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
if closest == nil {
panic("could not find anything.")
u.PErr("handleFindPeer: could not find anything.")
success = false
}
if len(closest.Addresses) == 0 {
panic("no addresses for connected peer...")
u.PErr("handleFindPeer: no addresses for connected peer...")
success = false
}
u.POut("handleFindPeer: sending back '%s'", closest.ID.Pretty())
addr,err := closest.Addresses[0].String()
addr, err := closest.Addresses[0].String()
if err != nil {
panic(err)
u.PErr(err.Error())
success = false
}
resp := pDHTMessage{
Type: pmes.GetType(),
resp := DHTMessage{
Type: pmes.GetType(),
Response: true,
Id: pmes.GetId(),
Value: []byte(addr),
Id: pmes.GetId(),
Value: []byte(addr),
Success: success,
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <-mes
dht.network.Chan.Outgoing <- mes
}
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
dht.providerLock.RLock()
providers := dht.providers[u.Key(pmes.GetKey())]
dht.providerLock.RUnlock()
......@@ -344,9 +332,9 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
// This is just a quick hack, formalize method of sending addrs later
addrs := make(map[u.Key]string)
for _,prov := range providers {
for _, prov := range providers {
ma := prov.Value.NetAddress("tcp")
str,err := ma.String()
str, err := ma.String()
if err != nil {
u.PErr("Error: %s", err)
continue
......@@ -355,35 +343,38 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
addrs[prov.Value.Key()] = str
}
data,err := json.Marshal(addrs)
success := true
data, err := json.Marshal(addrs)
if err != nil {
panic(err)
u.POut("handleGetProviders: error marshalling struct to JSON: %s", err)
data = nil
success = false
}
resp := pDHTMessage{
Type: DHTMessage_GET_PROVIDERS,
Key: pmes.GetKey(),
Value: data,
Id: pmes.GetId(),
resp := DHTMessage{
Type: PBDHTMessage_GET_PROVIDERS,
Key: pmes.GetKey(),
Value: data,
Id: pmes.GetId(),
Response: true,
Success: success,
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <-mes
dht.network.Chan.Outgoing <- mes
}
type providerInfo struct {
Creation time.Time
Value *peer.Peer
Value *peer.Peer
}
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) {
//TODO: need to implement TTLs on providers
key := u.Key(pmes.GetKey())
dht.addProviderEntry(key, p)
}
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
func (dht *IpfsDHT) ListenFor(mesid uint64, count int, timeout time.Duration) <-chan *swarm.Message {
......@@ -407,7 +398,7 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) {
func (dht *IpfsDHT) IsListening(mesid uint64) bool {
dht.listenLock.RLock()
li,ok := dht.listeners[mesid]
li, ok := dht.listeners[mesid]
dht.listenLock.RUnlock()
if time.Now().After(li.eol) {
dht.listenLock.Lock()
......@@ -432,7 +423,7 @@ func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
dht.providerLock.Unlock()
}
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
dht.diaglock.Lock()
if dht.IsListening(pmes.GetId()) {
//TODO: ehhh..........
......@@ -442,15 +433,13 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
dht.diaglock.Unlock()
seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second * 30)
listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30)
for _,ps := range seq {
for _, ps := range seq {
mes := swarm.NewMessage(ps, pmes)
dht.network.Chan.Outgoing <-mes
dht.network.Chan.Outgoing <- mes
}
buf := new(bytes.Buffer)
di := dht.getDiagInfo()
buf.Write(di.Marshal())
......@@ -464,7 +453,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
//Timeout, return what we have
goto out
case req_resp := <-listen_chan:
pmes_out := new(DHTMessage)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(req_resp.Data, pmes_out)
if err != nil {
// It broke? eh, whatever, keep going
......@@ -476,19 +465,19 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
}
out:
resp := pDHTMessage{
Type: DHTMessage_DIAGNOSTIC,
Id: pmes.GetId(),
Value: buf.Bytes(),
resp := DHTMessage{
Type: PBDHTMessage_DIAGNOSTIC,
Id: pmes.GetId(),
Value: buf.Bytes(),
Response: true,
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <-mes
dht.network.Chan.Outgoing <- mes
}
func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
v,err := dht.datastore.Get(ds.NewKey(string(key)))
v, err := dht.datastore.Get(ds.NewKey(string(key)))
if err != nil {
return nil, err
}
......@@ -498,3 +487,10 @@ func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error {
return dht.datastore.Put(ds.NewKey(string(key)), value)
}
func (dht *IpfsDHT) Update(p *peer.Peer) {
removed := dht.routes[0].Update(p)
if removed != nil {
dht.network.Drop(removed)
}
}
......@@ -2,21 +2,22 @@ package dht
import (
"testing"
peer "github.com/jbenet/go-ipfs/peer"
ma "github.com/jbenet/go-multiaddr"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
"time"
"fmt"
"time"
)
func TestPing(t *testing.T) {
u.Debug = false
addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234")
addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234")
if err != nil {
t.Fatal(err)
}
addr_b,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
if err != nil {
t.Fatal(err)
}
......@@ -29,12 +30,12 @@ func TestPing(t *testing.T) {
peer_b.AddAddress(addr_b)
peer_b.ID = peer.ID([]byte("peer_b"))
dht_a,err := NewDHT(peer_a)
dht_a, err := NewDHT(peer_a)
if err != nil {
t.Fatal(err)
}
dht_b,err := NewDHT(peer_b)
dht_b, err := NewDHT(peer_b)
if err != nil {
t.Fatal(err)
}
......@@ -42,13 +43,13 @@ func TestPing(t *testing.T) {
dht_a.Start()
dht_b.Start()
_,err = dht_a.Connect(addr_b)
_, err = dht_a.Connect(addr_b)
if err != nil {
t.Fatal(err)
}
//Test that we can ping the node
err = dht_a.Ping(peer_b, time.Second * 2)
err = dht_a.Ping(peer_b, time.Second*2)
if err != nil {
t.Fatal(err)
}
......@@ -59,11 +60,11 @@ func TestPing(t *testing.T) {
func TestValueGetSet(t *testing.T) {
u.Debug = false
addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235")
addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235")
if err != nil {
t.Fatal(err)
}
addr_b,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679")
addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679")
if err != nil {
t.Fatal(err)
}
......@@ -76,12 +77,12 @@ func TestValueGetSet(t *testing.T) {
peer_b.AddAddress(addr_b)
peer_b.ID = peer.ID([]byte("peer_b"))
dht_a,err := NewDHT(peer_a)
dht_a, err := NewDHT(peer_a)
if err != nil {
t.Fatal(err)
}
dht_b,err := NewDHT(peer_b)
dht_b, err := NewDHT(peer_b)
if err != nil {
t.Fatal(err)
}
......@@ -89,7 +90,7 @@ func TestValueGetSet(t *testing.T) {
dht_a.Start()
dht_b.Start()
_,err = dht_a.Connect(addr_b)
_, err = dht_a.Connect(addr_b)
if err != nil {
t.Fatal(err)
}
......@@ -99,7 +100,7 @@ func TestValueGetSet(t *testing.T) {
t.Fatal(err)
}
val, err := dht_a.GetValue("hello", time.Second * 2)
val, err := dht_a.GetValue("hello", time.Second*2)
if err != nil {
t.Fatal(err)
}
......@@ -113,14 +114,13 @@ func TestProvides(t *testing.T) {
u.Debug = false
var addrs []*ma.Multiaddr
for i := 0; i < 4; i++ {
a,err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000 + i))
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i))
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, a)
}
var peers []*peer.Peer
for i := 0; i < 4; i++ {
p := new(peer.Peer)
......@@ -131,7 +131,7 @@ func TestProvides(t *testing.T) {
var dhts []*IpfsDHT
for i := 0; i < 4; i++ {
d,err := NewDHT(peers[i])
d, err := NewDHT(peers[i])
if err != nil {
t.Fatal(err)
}
......@@ -166,7 +166,7 @@ func TestProvides(t *testing.T) {
time.Sleep(time.Millisecond * 60)
provs,err := dhts[0].FindProviders(u.Key("hello"), time.Second)
provs, err := dhts[0].FindProviders(u.Key("hello"), time.Second)
if err != nil {
t.Fatal(err)
}
......@@ -174,6 +174,8 @@ func TestProvides(t *testing.T) {
if len(provs) != 1 {
t.Fatal("Didnt get back providers")
}
}
for i := 0; i < 4; i++ {
dhts[i].Halt()
}
}
......@@ -38,7 +38,7 @@ func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di.Keys = nil // Currently no way to query datastore
for _,p := range dht.routes[0].listpeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetDistance(), p.ID})
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID})
}
return di
}
......@@ -9,7 +9,7 @@ It is generated from these files:
messages.proto
It has these top-level messages:
DHTMessage
PBDHTMessage
*/
package dht
......@@ -20,19 +20,19 @@ import math "math"
var _ = proto.Marshal
var _ = math.Inf
type DHTMessage_MessageType int32
type PBDHTMessage_MessageType int32
const (
DHTMessage_PUT_VALUE DHTMessage_MessageType = 0
DHTMessage_GET_VALUE DHTMessage_MessageType = 1
DHTMessage_ADD_PROVIDER DHTMessage_MessageType = 2
DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3
DHTMessage_FIND_NODE DHTMessage_MessageType = 4
DHTMessage_PING DHTMessage_MessageType = 5
DHTMessage_DIAGNOSTIC DHTMessage_MessageType = 6
PBDHTMessage_PUT_VALUE PBDHTMessage_MessageType = 0
PBDHTMessage_GET_VALUE PBDHTMessage_MessageType = 1
PBDHTMessage_ADD_PROVIDER PBDHTMessage_MessageType = 2
PBDHTMessage_GET_PROVIDERS PBDHTMessage_MessageType = 3
PBDHTMessage_FIND_NODE PBDHTMessage_MessageType = 4
PBDHTMessage_PING PBDHTMessage_MessageType = 5
PBDHTMessage_DIAGNOSTIC PBDHTMessage_MessageType = 6
)
var DHTMessage_MessageType_name = map[int32]string{
var PBDHTMessage_MessageType_name = map[int32]string{
0: "PUT_VALUE",
1: "GET_VALUE",
2: "ADD_PROVIDER",
......@@ -41,7 +41,7 @@ var DHTMessage_MessageType_name = map[int32]string{
5: "PING",
6: "DIAGNOSTIC",
}
var DHTMessage_MessageType_value = map[string]int32{
var PBDHTMessage_MessageType_value = map[string]int32{
"PUT_VALUE": 0,
"GET_VALUE": 1,
"ADD_PROVIDER": 2,
......@@ -51,79 +51,111 @@ var DHTMessage_MessageType_value = map[string]int32{
"DIAGNOSTIC": 6,
}
func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType {
p := new(DHTMessage_MessageType)
func (x PBDHTMessage_MessageType) Enum() *PBDHTMessage_MessageType {
p := new(PBDHTMessage_MessageType)
*p = x
return p
}
func (x DHTMessage_MessageType) String() string {
return proto.EnumName(DHTMessage_MessageType_name, int32(x))
func (x PBDHTMessage_MessageType) String() string {
return proto.EnumName(PBDHTMessage_MessageType_name, int32(x))
}
func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(DHTMessage_MessageType_value, data, "DHTMessage_MessageType")
func (x *PBDHTMessage_MessageType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(PBDHTMessage_MessageType_value, data, "PBDHTMessage_MessageType")
if err != nil {
return err
}
*x = DHTMessage_MessageType(value)
*x = PBDHTMessage_MessageType(value)
return nil
}
type DHTMessage struct {
Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"`
XXX_unrecognized []byte `json:"-"`
type PBDHTMessage struct {
Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"`
Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DHTMessage) Reset() { *m = DHTMessage{} }
func (m *DHTMessage) String() string { return proto.CompactTextString(m) }
func (*DHTMessage) ProtoMessage() {}
func (m *PBDHTMessage) Reset() { *m = PBDHTMessage{} }
func (m *PBDHTMessage) String() string { return proto.CompactTextString(m) }
func (*PBDHTMessage) ProtoMessage() {}
func (m *DHTMessage) GetType() DHTMessage_MessageType {
func (m *PBDHTMessage) GetType() PBDHTMessage_MessageType {
if m != nil && m.Type != nil {
return *m.Type
}
return DHTMessage_PUT_VALUE
return PBDHTMessage_PUT_VALUE
}
func (m *DHTMessage) GetKey() string {
func (m *PBDHTMessage) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
}
return ""
}
func (m *DHTMessage) GetValue() []byte {
func (m *PBDHTMessage) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *DHTMessage) GetId() uint64 {
func (m *PBDHTMessage) GetId() uint64 {
if m != nil && m.Id != nil {
return *m.Id
}
return 0
}
func (m *DHTMessage) GetResponse() bool {
func (m *PBDHTMessage) GetResponse() bool {
if m != nil && m.Response != nil {
return *m.Response
}
return false
}
func (m *DHTMessage) GetSuccess() bool {
func (m *PBDHTMessage) GetSuccess() bool {
if m != nil && m.Success != nil {
return *m.Success
}
return false
}
func (m *PBDHTMessage) GetPeers() []*PBDHTMessage_PBPeer {
if m != nil {
return m.Peers
}
return nil
}
type PBDHTMessage_PBPeer struct {
Id *string `protobuf:"bytes,1,req,name=id" json:"id,omitempty"`
Addr *string `protobuf:"bytes,2,req,name=addr" json:"addr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PBDHTMessage_PBPeer) Reset() { *m = PBDHTMessage_PBPeer{} }
func (m *PBDHTMessage_PBPeer) String() string { return proto.CompactTextString(m) }
func (*PBDHTMessage_PBPeer) ProtoMessage() {}
func (m *PBDHTMessage_PBPeer) GetId() string {
if m != nil && m.Id != nil {
return *m.Id
}
return ""
}
func (m *PBDHTMessage_PBPeer) GetAddr() string {
if m != nil && m.Addr != nil {
return *m.Addr
}
return ""
}
func init() {
proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value)
proto.RegisterEnum("dht.PBDHTMessage_MessageType", PBDHTMessage_MessageType_name, PBDHTMessage_MessageType_value)
}
......@@ -2,7 +2,7 @@ package dht;
//run `protoc --go_out=. *.proto` to generate
message DHTMessage {
message PBDHTMessage {
enum MessageType {
PUT_VALUE = 0;
GET_VALUE = 1;
......@@ -13,6 +13,11 @@ message DHTMessage {
DIAGNOSTIC = 6;
}
message PBPeer {
required string id = 1;
required string addr = 2;
}
required MessageType type = 1;
optional string key = 2;
optional bytes value = 3;
......@@ -23,4 +28,7 @@ message DHTMessage {
// Signals whether or not this message is a response to another message
optional bool response = 5;
optional bool success = 6;
// Used for returning peers from queries (normally, peers closer to X)
repeated PBPeer peers = 7;
}
package dht
import (
"math/rand"
"time"
"bytes"
"encoding/json"
"errors"
"math/rand"
"time"
proto "code.google.com/p/goprotobuf/proto"
......@@ -34,7 +35,7 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var p *peer.Peer
p = s.routes[0].NearestPeer(convertKey(key))
if p == nil {
panic("Table returned nil peer!")
return errors.New("Table returned nil peer!")
}
return s.putValueToPeer(p, string(key), value)
......@@ -47,13 +48,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
var p *peer.Peer
p = s.routes[0].NearestPeer(convertKey(key))
if p == nil {
panic("Table returned nil peer!")
return nil, errors.New("Table returned nil peer!")
}
pmes := pDHTMessage{
Type: DHTMessage_GET_VALUE,
Key: string(key),
Id: GenerateMessageID(),
pmes := DHTMessage{
Type: PBDHTMessage_GET_VALUE,
Key: string(key),
Id: GenerateMessageID(),
}
response_chan := s.ListenFor(pmes.Id, 1, time.Minute)
......@@ -68,15 +69,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
return nil, u.ErrTimeout
case resp, ok := <-response_chan:
if !ok {
panic("Channel was closed...")
u.PErr("response channel closed before timeout, please investigate.")
return nil, u.ErrTimeout
}
if resp == nil {
panic("Why the hell is this response nil?")
}
pmes_out := new(DHTMessage)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil,err
return nil, err
}
if pmes_out.GetSuccess() {
return pmes_out.GetValue(), nil
......@@ -96,15 +95,15 @@ func (s *IpfsDHT) Provide(key u.Key) error {
//return an error
}
pmes := pDHTMessage{
Type: DHTMessage_ADD_PROVIDER,
Key: string(key),
pmes := DHTMessage{
Type: PBDHTMessage_ADD_PROVIDER,
Key: string(key),
}
pbmes := pmes.ToProtobuf()
for _,p := range peers {
for _, p := range peers {
mes := swarm.NewMessage(p, pbmes)
s.network.Chan.Outgoing <-mes
s.network.Chan.Outgoing <- mes
}
return nil
}
......@@ -113,17 +112,17 @@ func (s *IpfsDHT) Provide(key u.Key) error {
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
p := s.routes[0].NearestPeer(convertKey(key))
pmes := pDHTMessage{
Type: DHTMessage_GET_PROVIDERS,
Key: string(key),
Id: GenerateMessageID(),
pmes := DHTMessage{
Type: PBDHTMessage_GET_PROVIDERS,
Key: string(key),
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
u.DOut("Find providers for: '%s'", key)
s.network.Chan.Outgoing <-mes
s.network.Chan.Outgoing <- mes
after := time.After(timeout)
select {
case <-after:
......@@ -131,7 +130,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
return nil, u.ErrTimeout
case resp := <-listen_chan:
u.DOut("FindProviders: got response.")
pmes_out := new(DHTMessage)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
......@@ -143,10 +142,10 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
}
var prov_arr []*peer.Peer
for pid,addr := range addrs {
for pid, addr := range addrs {
p := s.network.Find(pid)
if p == nil {
maddr,err := ma.NewMultiaddr(addr)
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
u.PErr("error connecting to new peer: %s", err)
continue
......@@ -171,23 +170,23 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
p := s.routes[0].NearestPeer(convertPeerID(id))
pmes := pDHTMessage{
Type: DHTMessage_FIND_NODE,
Key: string(id),
Id: GenerateMessageID(),
pmes := DHTMessage{
Type: PBDHTMessage_FIND_NODE,
Key: string(id),
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
s.network.Chan.Outgoing <-mes
s.network.Chan.Outgoing <- mes
after := time.After(timeout)
select {
case <-after:
s.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listen_chan:
pmes_out := new(DHTMessage)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
......@@ -218,7 +217,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
u.DOut("Enter Ping.")
pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
......@@ -229,7 +228,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
select {
case <-response_chan:
roundtrip := time.Since(before)
p.SetDistance(roundtrip)
p.SetLatency(roundtrip)
u.POut("Ping took %s.", roundtrip.String())
return nil
case <-tout:
......@@ -246,17 +245,17 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
targets := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
// TODO: Add timeout to this struct so nodes know when to return
pmes := pDHTMessage{
Type: DHTMessage_DIAGNOSTIC,
Id: GenerateMessageID(),
pmes := DHTMessage{
Type: PBDHTMessage_DIAGNOSTIC,
Id: GenerateMessageID(),
}
listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute * 2)
listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute*2)
pbmes := pmes.ToProtobuf()
for _,p := range targets {
for _, p := range targets {
mes := swarm.NewMessage(p, pbmes)
dht.network.Chan.Outgoing <-mes
dht.network.Chan.Outgoing <- mes
}
var out []*diagInfo
......@@ -267,7 +266,7 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
u.DOut("Diagnostic request timed out.")
return out, u.ErrTimeout
case resp := <-listen_chan:
pmes_out := new(DHTMessage)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
// NOTE: here and elsewhere, need to audit error handling,
......@@ -288,5 +287,5 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
}
}
return nil,nil
return nil, nil
}
......@@ -125,7 +125,7 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
cpl := xor(id, rt.local).commonPrefixLen()
cpl := prefLen(id, rt.local)
// Get bucket at cpl index or last bucket
var bucket *Bucket
......
......@@ -40,6 +40,10 @@ func (id ID) commonPrefixLen() int {
return len(id)*8 - 1
}
func prefLen(a, b ID) int {
return xor(a, b).commonPrefixLen()
}
func xor(a, b ID) ID {
a, b = equalizeSizes(a, b)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment