Commit ae6285e5 authored by Jeromy's avatar Jeromy

address issues from code review (issue #25)

parent e14fb565
package peer
import (
"encoding/hex"
"time"
"sync"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
mh "github.com/jbenet/go-multihash"
b58 "github.com/jbenet/go-base58"
"bytes"
)
......@@ -21,7 +21,7 @@ func (id ID) Equal(other ID) bool {
}
func (id ID) Pretty() string {
return hex.EncodeToString(id)
return b58.Encode(id)
}
// Map maps Key (string) : *Peer (slices are not comparable).
......@@ -33,8 +33,8 @@ type Peer struct {
ID ID
Addresses []*ma.Multiaddr
distance time.Duration
distLock sync.RWMutex
latency time.Duration
latenLock sync.RWMutex
}
// Key returns the ID as a Key (string) for maps.
......@@ -64,12 +64,15 @@ func (p *Peer) NetAddress(n string) *ma.Multiaddr {
return nil
}
func (p *Peer) GetDistance() time.Duration {
return p.distance
func (p *Peer) GetLatency() (out time.Duration) {
p.latenLock.RLock()
out = p.latency
p.latenLock.RUnlock()
return
}
func (p *Peer) SetDistance(dist time.Duration) {
p.distLock.Lock()
p.distance = dist
p.distLock.Unlock()
func (p *Peer) SetLatency(laten time.Duration) {
p.latenLock.Lock()
p.latency = laten
p.latenLock.Unlock()
}
package dht
// A helper struct to make working with protbuf types easier
type pDHTMessage struct {
Type DHTMessage_MessageType
Key string
Value []byte
type DHTMessage struct {
Type PBDHTMessage_MessageType
Key string
Value []byte
Response bool
Id uint64
Success bool
Id uint64
Success bool
}
func (m *pDHTMessage) ToProtobuf() *DHTMessage {
pmes := new(DHTMessage)
func (m *DHTMessage) ToProtobuf() *PBDHTMessage {
pmes := new(PBDHTMessage)
if m.Value != nil {
pmes.Value = m.Value
}
......
......@@ -49,7 +49,7 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket {
e := bucket_list.Front()
for e != nil {
peer_id := convertPeerID(e.Value.(*peer.Peer).ID)
peer_cpl := xor(peer_id, target).commonPrefixLen()
peer_cpl := prefLen(peer_id, target)
if peer_cpl > cpl {
cur := e
out.PushBack(e.Value)
......
package dht
import (
"sync"
"time"
"bytes"
"encoding/json"
"errors"
"sync"
"time"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
identify "github.com/jbenet/go-ipfs/identify"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
......@@ -37,7 +37,7 @@ type IpfsDHT struct {
// Map keys to peers that can provide their value
// TODO: implement a TTL on each of these keys
providers map[u.Key][]*providerInfo
providers map[u.Key][]*providerInfo
providerLock sync.RWMutex
// map of channels waiting for reply messages
......@@ -54,21 +54,27 @@ type IpfsDHT struct {
diaglock sync.Mutex
}
// The listen info struct holds information about a message that is being waited for
type listenInfo struct {
// Responses matching the listen ID will be sent through resp
resp chan *swarm.Message
// count is the number of responses to listen for
count int
// eol is the time at which this listener will expire
eol time.Time
}
// Create a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
if p == nil {
panic("Tried to create new dht with nil peer")
return nil, errors.New("nil peer passed to NewDHT()")
}
network := swarm.NewSwarm(p)
err := network.Listen()
if err != nil {
return nil,err
return nil, err
}
dht := new(IpfsDHT)
......@@ -90,50 +96,24 @@ func (dht *IpfsDHT) Start() {
}
// Connect to a new peer at the given address
// TODO: move this into swarm
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
maddrstr,_ := addr.String()
maddrstr, _ := addr.String()
u.DOut("Connect to new peer: %s", maddrstr)
if addr == nil {
panic("addr was nil!")
}
peer := new(peer.Peer)
peer.AddAddress(addr)
conn,err := swarm.Dial("tcp", peer)
if err != nil {
return nil, err
}
err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
npeer, err := dht.network.Connect(addr)
if err != nil {
return nil, err
}
// Send node an address that you can be reached on
myaddr := dht.self.NetAddress("tcp")
mastr,err := myaddr.String()
if err != nil {
panic("No local address to send")
}
conn.Outgoing.MsgChan <- []byte(mastr)
dht.network.StartConn(conn)
removed := dht.routes[0].Update(peer)
if removed != nil {
panic("need to remove this peer.")
}
dht.Update(npeer)
// Ping new peer to register in their routing table
// NOTE: this should be done better...
err = dht.Ping(peer, time.Second * 2)
err = dht.Ping(npeer, time.Second*2)
if err != nil {
panic("Failed to ping new peer.")
return nil, errors.New("Failed to ping newly connected peer.")
}
return peer, nil
return npeer, nil
}
// Read in all messages from swarm and handle them appropriately
......@@ -144,23 +124,19 @@ func (dht *IpfsDHT) handleMessages() {
checkTimeouts := time.NewTicker(time.Minute * 5)
for {
select {
case mes,ok := <-dht.network.Chan.Incoming:
case mes, ok := <-dht.network.Chan.Incoming:
if !ok {
u.DOut("handleMessages closing, bad recv on incoming")
return
}
pmes := new(DHTMessage)
pmes := new(PBDHTMessage)
err := proto.Unmarshal(mes.Data, pmes)
if err != nil {
u.PErr("Failed to decode protobuf message: %s", err)
continue
}
// Update peers latest visit in routing table
removed := dht.routes[0].Update(mes.Peer)
if removed != nil {
panic("Need to handle removed peer.")
}
dht.Update(mes.Peer)
// Note: not sure if this is the correct place for this
if pmes.GetResponse() {
......@@ -180,7 +156,6 @@ func (dht *IpfsDHT) handleMessages() {
dht.Unlisten(pmes.GetId())
}
} else {
// this is expected behaviour during a timeout
u.DOut("Received response with nobody listening...")
}
......@@ -190,65 +165,73 @@ func (dht *IpfsDHT) handleMessages() {
u.DOut("[peer: %s]", dht.self.ID.Pretty())
u.DOut("Got message type: '%s' [id = %x, from = %s]",
DHTMessage_MessageType_name[int32(pmes.GetType())],
PBDHTMessage_MessageType_name[int32(pmes.GetType())],
pmes.GetId(), mes.Peer.ID.Pretty())
switch pmes.GetType() {
case DHTMessage_GET_VALUE:
case PBDHTMessage_GET_VALUE:
dht.handleGetValue(mes.Peer, pmes)
case DHTMessage_PUT_VALUE:
case PBDHTMessage_PUT_VALUE:
dht.handlePutValue(mes.Peer, pmes)
case DHTMessage_FIND_NODE:
case PBDHTMessage_FIND_NODE:
dht.handleFindPeer(mes.Peer, pmes)
case DHTMessage_ADD_PROVIDER:
case PBDHTMessage_ADD_PROVIDER:
dht.handleAddProvider(mes.Peer, pmes)
case DHTMessage_GET_PROVIDERS:
case PBDHTMessage_GET_PROVIDERS:
dht.handleGetProviders(mes.Peer, pmes)
case DHTMessage_PING:
case PBDHTMessage_PING:
dht.handlePing(mes.Peer, pmes)
case DHTMessage_DIAGNOSTIC:
case PBDHTMessage_DIAGNOSTIC:
dht.handleDiagnostic(mes.Peer, pmes)
}
case err := <-dht.network.Chan.Errors:
u.DErr("dht err: %s", err)
panic(err)
case <-dht.shutdown:
checkTimeouts.Stop()
return
case <-checkTimeouts.C:
dht.providerLock.Lock()
for k,parr := range dht.providers {
var cleaned []*providerInfo
for _,v := range parr {
if time.Since(v.Creation) < time.Hour {
cleaned = append(cleaned, v)
}
}
dht.providers[k] = cleaned
}
dht.providerLock.Unlock()
dht.listenLock.Lock()
var remove []uint64
now := time.Now()
for k,v := range dht.listeners {
if now.After(v.eol) {
remove = append(remove, k)
}
}
for _,k := range remove {
delete(dht.listeners, k)
// Time to collect some garbage!
dht.cleanExpiredProviders()
dht.cleanExpiredListeners()
}
}
}
func (dht *IpfsDHT) cleanExpiredProviders() {
dht.providerLock.Lock()
for k, parr := range dht.providers {
var cleaned []*providerInfo
for _, v := range parr {
if time.Since(v.Creation) < time.Hour {
cleaned = append(cleaned, v)
}
dht.listenLock.Unlock()
}
dht.providers[k] = cleaned
}
dht.providerLock.Unlock()
}
func (dht *IpfsDHT) cleanExpiredListeners() {
dht.listenLock.Lock()
var remove []uint64
now := time.Now()
for k, v := range dht.listeners {
if now.After(v.eol) {
remove = append(remove, k)
}
}
for _, k := range remove {
delete(dht.listeners, k)
}
dht.listenLock.Unlock()
}
func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error {
pmes := pDHTMessage{
Type: DHTMessage_PUT_VALUE,
Key: key,
pmes := DHTMessage{
Type: PBDHTMessage_PUT_VALUE,
Key: key,
Value: value,
Id: GenerateMessageID(),
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
......@@ -256,27 +239,27 @@ func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error
return nil
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
var resp *pDHTMessage
var resp *DHTMessage
i_val, err := dht.datastore.Get(dskey)
if err == nil {
resp = &pDHTMessage{
resp = &DHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: i_val.([]byte),
Success: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: i_val.([]byte),
Success: true,
}
} else if err == ds.ErrNotFound {
// Find closest peer(s) to desired key and reply with that info
closer := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
resp = &pDHTMessage{
resp = &DHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: closer.ID,
Success: false,
Id: *pmes.Id,
Key: *pmes.Key,
Value: closer.ID,
Success: false,
}
}
......@@ -285,7 +268,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
}
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
err := dht.datastore.Put(dskey, pmes.GetValue())
if err != nil {
......@@ -294,46 +277,51 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
}
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
resp := pDHTMessage{
Type: pmes.GetType(),
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) {
resp := DHTMessage{
Type: pmes.GetType(),
Response: true,
Id: pmes.GetId(),
Id: pmes.GetId(),
}
dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
}
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
success := true
u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
closest := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
if closest == nil {
panic("could not find anything.")
u.PErr("handleFindPeer: could not find anything.")
success = false
}
if len(closest.Addresses) == 0 {
panic("no addresses for connected peer...")
u.PErr("handleFindPeer: no addresses for connected peer...")
success = false
}
u.POut("handleFindPeer: sending back '%s'", closest.ID.Pretty())
addr,err := closest.Addresses[0].String()
addr, err := closest.Addresses[0].String()
if err != nil {
panic(err)
u.PErr(err.Error())
success = false
}
resp := pDHTMessage{
Type: pmes.GetType(),
resp := DHTMessage{
Type: pmes.GetType(),
Response: true,
Id: pmes.GetId(),
Value: []byte(addr),
Id: pmes.GetId(),
Value: []byte(addr),
Success: success,
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <-mes
dht.network.Chan.Outgoing <- mes
}
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
dht.providerLock.RLock()
providers := dht.providers[u.Key(pmes.GetKey())]
dht.providerLock.RUnlock()
......@@ -344,9 +332,9 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
// This is just a quick hack, formalize method of sending addrs later
addrs := make(map[u.Key]string)
for _,prov := range providers {
for _, prov := range providers {
ma := prov.Value.NetAddress("tcp")
str,err := ma.String()
str, err := ma.String()
if err != nil {
u.PErr("Error: %s", err)
continue
......@@ -355,35 +343,38 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) {
addrs[prov.Value.Key()] = str
}
data,err := json.Marshal(addrs)
success := true
data, err := json.Marshal(addrs)
if err != nil {
panic(err)
u.POut("handleGetProviders: error marshalling struct to JSON: %s", err)
data = nil
success = false
}
resp := pDHTMessage{
Type: DHTMessage_GET_PROVIDERS,
Key: pmes.GetKey(),
Value: data,
Id: pmes.GetId(),
resp := DHTMessage{
Type: PBDHTMessage_GET_PROVIDERS,
Key: pmes.GetKey(),
Value: data,
Id: pmes.GetId(),
Response: true,
Success: success,
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <-mes
dht.network.Chan.Outgoing <- mes
}
type providerInfo struct {
Creation time.Time
Value *peer.Peer
Value *peer.Peer
}
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) {
//TODO: need to implement TTLs on providers
key := u.Key(pmes.GetKey())
dht.addProviderEntry(key, p)
}
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
func (dht *IpfsDHT) ListenFor(mesid uint64, count int, timeout time.Duration) <-chan *swarm.Message {
......@@ -407,7 +398,7 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) {
func (dht *IpfsDHT) IsListening(mesid uint64) bool {
dht.listenLock.RLock()
li,ok := dht.listeners[mesid]
li, ok := dht.listeners[mesid]
dht.listenLock.RUnlock()
if time.Now().After(li.eol) {
dht.listenLock.Lock()
......@@ -432,7 +423,7 @@ func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
dht.providerLock.Unlock()
}
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
dht.diaglock.Lock()
if dht.IsListening(pmes.GetId()) {
//TODO: ehhh..........
......@@ -442,15 +433,13 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
dht.diaglock.Unlock()
seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second * 30)
listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30)
for _,ps := range seq {
for _, ps := range seq {
mes := swarm.NewMessage(ps, pmes)
dht.network.Chan.Outgoing <-mes
dht.network.Chan.Outgoing <- mes
}
buf := new(bytes.Buffer)
di := dht.getDiagInfo()
buf.Write(di.Marshal())
......@@ -464,7 +453,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
//Timeout, return what we have
goto out
case req_resp := <-listen_chan:
pmes_out := new(DHTMessage)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(req_resp.Data, pmes_out)
if err != nil {
// It broke? eh, whatever, keep going
......@@ -476,19 +465,19 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
}
out:
resp := pDHTMessage{
Type: DHTMessage_DIAGNOSTIC,
Id: pmes.GetId(),
Value: buf.Bytes(),
resp := DHTMessage{
Type: PBDHTMessage_DIAGNOSTIC,
Id: pmes.GetId(),
Value: buf.Bytes(),
Response: true,
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Chan.Outgoing <-mes
dht.network.Chan.Outgoing <- mes
}
func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
v,err := dht.datastore.Get(ds.NewKey(string(key)))
v, err := dht.datastore.Get(ds.NewKey(string(key)))
if err != nil {
return nil, err
}
......@@ -498,3 +487,10 @@ func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error {
return dht.datastore.Put(ds.NewKey(string(key)), value)
}
func (dht *IpfsDHT) Update(p *peer.Peer) {
removed := dht.routes[0].Update(p)
if removed != nil {
dht.network.Drop(removed)
}
}
......@@ -2,21 +2,22 @@ package dht
import (
"testing"
peer "github.com/jbenet/go-ipfs/peer"
ma "github.com/jbenet/go-multiaddr"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
"time"
"fmt"
"time"
)
func TestPing(t *testing.T) {
u.Debug = false
addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234")
addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234")
if err != nil {
t.Fatal(err)
}
addr_b,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
if err != nil {
t.Fatal(err)
}
......@@ -29,12 +30,12 @@ func TestPing(t *testing.T) {
peer_b.AddAddress(addr_b)
peer_b.ID = peer.ID([]byte("peer_b"))
dht_a,err := NewDHT(peer_a)
dht_a, err := NewDHT(peer_a)
if err != nil {
t.Fatal(err)
}
dht_b,err := NewDHT(peer_b)
dht_b, err := NewDHT(peer_b)
if err != nil {
t.Fatal(err)
}
......@@ -42,13 +43,13 @@ func TestPing(t *testing.T) {
dht_a.Start()
dht_b.Start()
_,err = dht_a.Connect(addr_b)
_, err = dht_a.Connect(addr_b)
if err != nil {
t.Fatal(err)
}
//Test that we can ping the node
err = dht_a.Ping(peer_b, time.Second * 2)
err = dht_a.Ping(peer_b, time.Second*2)
if err != nil {
t.Fatal(err)
}
......@@ -59,11 +60,11 @@ func TestPing(t *testing.T) {
func TestValueGetSet(t *testing.T) {
u.Debug = false
addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235")
addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235")
if err != nil {
t.Fatal(err)
}
addr_b,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679")
addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679")
if err != nil {
t.Fatal(err)
}
......@@ -76,12 +77,12 @@ func TestValueGetSet(t *testing.T) {
peer_b.AddAddress(addr_b)
peer_b.ID = peer.ID([]byte("peer_b"))
dht_a,err := NewDHT(peer_a)
dht_a, err := NewDHT(peer_a)
if err != nil {
t.Fatal(err)
}
dht_b,err := NewDHT(peer_b)
dht_b, err := NewDHT(peer_b)
if err != nil {
t.Fatal(err)
}
......@@ -89,7 +90,7 @@ func TestValueGetSet(t *testing.T) {
dht_a.Start()
dht_b.Start()
_,err = dht_a.Connect(addr_b)
_, err = dht_a.Connect(addr_b)
if err != nil {
t.Fatal(err)
}
......@@ -99,7 +100,7 @@ func TestValueGetSet(t *testing.T) {
t.Fatal(err)
}
val, err := dht_a.GetValue("hello", time.Second * 2)
val, err := dht_a.GetValue("hello", time.Second*2)
if err != nil {
t.Fatal(err)
}
......@@ -113,14 +114,13 @@ func TestProvides(t *testing.T) {
u.Debug = false
var addrs []*ma.Multiaddr
for i := 0; i < 4; i++ {
a,err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000 + i))
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i))
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, a)
}
var peers []*peer.Peer
for i := 0; i < 4; i++ {
p := new(peer.Peer)
......@@ -131,7 +131,7 @@ func TestProvides(t *testing.T) {
var dhts []*IpfsDHT
for i := 0; i < 4; i++ {
d,err := NewDHT(peers[i])
d, err := NewDHT(peers[i])
if err != nil {
t.Fatal(err)
}
......@@ -166,7 +166,7 @@ func TestProvides(t *testing.T) {
time.Sleep(time.Millisecond * 60)
provs,err := dhts[0].FindProviders(u.Key("hello"), time.Second)
provs, err := dhts[0].FindProviders(u.Key("hello"), time.Second)
if err != nil {
t.Fatal(err)
}
......@@ -174,6 +174,8 @@ func TestProvides(t *testing.T) {
if len(provs) != 1 {
t.Fatal("Didnt get back providers")
}
}
for i := 0; i < 4; i++ {
dhts[i].Halt()
}
}
......@@ -38,7 +38,7 @@ func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di.Keys = nil // Currently no way to query datastore
for _,p := range dht.routes[0].listpeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetDistance(), p.ID})
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID})
}
return di
}
......@@ -9,7 +9,7 @@ It is generated from these files:
messages.proto
It has these top-level messages:
DHTMessage
PBDHTMessage
*/
package dht
......@@ -20,19 +20,19 @@ import math "math"
var _ = proto.Marshal
var _ = math.Inf
type DHTMessage_MessageType int32
type PBDHTMessage_MessageType int32
const (
DHTMessage_PUT_VALUE DHTMessage_MessageType = 0
DHTMessage_GET_VALUE DHTMessage_MessageType = 1
DHTMessage_ADD_PROVIDER DHTMessage_MessageType = 2
DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3
DHTMessage_FIND_NODE DHTMessage_MessageType = 4
DHTMessage_PING DHTMessage_MessageType = 5
DHTMessage_DIAGNOSTIC DHTMessage_MessageType = 6
PBDHTMessage_PUT_VALUE PBDHTMessage_MessageType = 0
PBDHTMessage_GET_VALUE PBDHTMessage_MessageType = 1
PBDHTMessage_ADD_PROVIDER PBDHTMessage_MessageType = 2
PBDHTMessage_GET_PROVIDERS PBDHTMessage_MessageType = 3
PBDHTMessage_FIND_NODE PBDHTMessage_MessageType = 4
PBDHTMessage_PING PBDHTMessage_MessageType = 5
PBDHTMessage_DIAGNOSTIC PBDHTMessage_MessageType = 6
)
var DHTMessage_MessageType_name = map[int32]string{
var PBDHTMessage_MessageType_name = map[int32]string{
0: "PUT_VALUE",
1: "GET_VALUE",
2: "ADD_PROVIDER",
......@@ -41,7 +41,7 @@ var DHTMessage_MessageType_name = map[int32]string{
5: "PING",
6: "DIAGNOSTIC",
}
var DHTMessage_MessageType_value = map[string]int32{
var PBDHTMessage_MessageType_value = map[string]int32{
"PUT_VALUE": 0,
"GET_VALUE": 1,
"ADD_PROVIDER": 2,
......@@ -51,79 +51,111 @@ var DHTMessage_MessageType_value = map[string]int32{
"DIAGNOSTIC": 6,
}
func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType {
p := new(DHTMessage_MessageType)
func (x PBDHTMessage_MessageType) Enum() *PBDHTMessage_MessageType {
p := new(PBDHTMessage_MessageType)
*p = x
return p
}
func (x DHTMessage_MessageType) String() string {
return proto.EnumName(DHTMessage_MessageType_name, int32(x))
func (x PBDHTMessage_MessageType) String() string {
return proto.EnumName(PBDHTMessage_MessageType_name, int32(x))
}
func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(DHTMessage_MessageType_value, data, "DHTMessage_MessageType")
func (x *PBDHTMessage_MessageType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(PBDHTMessage_MessageType_value, data, "PBDHTMessage_MessageType")
if err != nil {
return err
}
*x = DHTMessage_MessageType(value)
*x = PBDHTMessage_MessageType(value)
return nil
}
type DHTMessage struct {
Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"`
XXX_unrecognized []byte `json:"-"`
type PBDHTMessage struct {
Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"`
Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DHTMessage) Reset() { *m = DHTMessage{} }
func (m *DHTMessage) String() string { return proto.CompactTextString(m) }
func (*DHTMessage) ProtoMessage() {}
func (m *PBDHTMessage) Reset() { *m = PBDHTMessage{} }
func (m *PBDHTMessage) String() string { return proto.CompactTextString(m) }
func (*PBDHTMessage) ProtoMessage() {}
func (m *DHTMessage) GetType() DHTMessage_MessageType {
func (m *PBDHTMessage) GetType() PBDHTMessage_MessageType {
if m != nil && m.Type != nil {
return *m.Type
}
return DHTMessage_PUT_VALUE
return PBDHTMessage_PUT_VALUE
}
func (m *DHTMessage) GetKey() string {
func (m *PBDHTMessage) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
}
return ""
}
func (m *DHTMessage) GetValue() []byte {
func (m *PBDHTMessage) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *DHTMessage) GetId() uint64 {
func (m *PBDHTMessage) GetId() uint64 {
if m != nil && m.Id != nil {
return *m.Id
}
return 0
}
func (m *DHTMessage) GetResponse() bool {
func (m *PBDHTMessage) GetResponse() bool {
if m != nil && m.Response != nil {
return *m.Response
}
return false
}
func (m *DHTMessage) GetSuccess() bool {
func (m *PBDHTMessage) GetSuccess() bool {
if m != nil && m.Success != nil {
return *m.Success
}
return false
}
func (m *PBDHTMessage) GetPeers() []*PBDHTMessage_PBPeer {
if m != nil {
return m.Peers
}
return nil
}
type PBDHTMessage_PBPeer struct {
Id *string `protobuf:"bytes,1,req,name=id" json:"id,omitempty"`
Addr *string `protobuf:"bytes,2,req,name=addr" json:"addr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PBDHTMessage_PBPeer) Reset() { *m = PBDHTMessage_PBPeer{} }
func (m *PBDHTMessage_PBPeer) String() string { return proto.CompactTextString(m) }
func (*PBDHTMessage_PBPeer) ProtoMessage() {}
func (m *PBDHTMessage_PBPeer) GetId() string {
if m != nil && m.Id != nil {
return *m.Id
}
return ""
}
func (m *PBDHTMessage_PBPeer) GetAddr() string {
if m != nil && m.Addr != nil {
return *m.Addr
}
return ""
}
func init() {
proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value)
proto.RegisterEnum("dht.PBDHTMessage_MessageType", PBDHTMessage_MessageType_name, PBDHTMessage_MessageType_value)
}
......@@ -2,7 +2,7 @@ package dht;
//run `protoc --go_out=. *.proto` to generate
message DHTMessage {
message PBDHTMessage {
enum MessageType {
PUT_VALUE = 0;
GET_VALUE = 1;
......@@ -13,6 +13,11 @@ message DHTMessage {
DIAGNOSTIC = 6;
}
message PBPeer {
required string id = 1;
required string addr = 2;
}
required MessageType type = 1;
optional string key = 2;
optional bytes value = 3;
......@@ -23,4 +28,7 @@ message DHTMessage {
// Signals whether or not this message is a response to another message
optional bool response = 5;
optional bool success = 6;
// Used for returning peers from queries (normally, peers closer to X)
repeated PBPeer peers = 7;
}
package dht
import (
"math/rand"
"time"
"bytes"
"encoding/json"
"errors"
"math/rand"
"time"
proto "code.google.com/p/goprotobuf/proto"
......@@ -34,7 +35,7 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var p *peer.Peer
p = s.routes[0].NearestPeer(convertKey(key))
if p == nil {
panic("Table returned nil peer!")
return errors.New("Table returned nil peer!")
}
return s.putValueToPeer(p, string(key), value)
......@@ -47,13 +48,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
var p *peer.Peer
p = s.routes[0].NearestPeer(convertKey(key))
if p == nil {
panic("Table returned nil peer!")
return nil, errors.New("Table returned nil peer!")
}
pmes := pDHTMessage{
Type: DHTMessage_GET_VALUE,
Key: string(key),
Id: GenerateMessageID(),
pmes := DHTMessage{
Type: PBDHTMessage_GET_VALUE,
Key: string(key),
Id: GenerateMessageID(),
}
response_chan := s.ListenFor(pmes.Id, 1, time.Minute)
......@@ -68,15 +69,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
return nil, u.ErrTimeout
case resp, ok := <-response_chan:
if !ok {
panic("Channel was closed...")
u.PErr("response channel closed before timeout, please investigate.")
return nil, u.ErrTimeout
}
if resp == nil {
panic("Why the hell is this response nil?")
}
pmes_out := new(DHTMessage)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil,err
return nil, err
}
if pmes_out.GetSuccess() {
return pmes_out.GetValue(), nil
......@@ -96,15 +95,15 @@ func (s *IpfsDHT) Provide(key u.Key) error {
//return an error
}
pmes := pDHTMessage{
Type: DHTMessage_ADD_PROVIDER,
Key: string(key),
pmes := DHTMessage{
Type: PBDHTMessage_ADD_PROVIDER,
Key: string(key),
}
pbmes := pmes.ToProtobuf()
for _,p := range peers {
for _, p := range peers {
mes := swarm.NewMessage(p, pbmes)
s.network.Chan.Outgoing <-mes
s.network.Chan.Outgoing <- mes
}
return nil
}
......@@ -113,17 +112,17 @@ func (s *IpfsDHT) Provide(key u.Key) error {
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
p := s.routes[0].NearestPeer(convertKey(key))
pmes := pDHTMessage{
Type: DHTMessage_GET_PROVIDERS,
Key: string(key),
Id: GenerateMessageID(),
pmes := DHTMessage{
Type: PBDHTMessage_GET_PROVIDERS,
Key: string(key),
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
u.DOut("Find providers for: '%s'", key)
s.network.Chan.Outgoing <-mes
s.network.Chan.Outgoing <- mes
after := time.After(timeout)
select {
case <-after:
......@@ -131,7 +130,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
return nil, u.ErrTimeout
case resp := <-listen_chan:
u.DOut("FindProviders: got response.")
pmes_out := new(DHTMessage)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
......@@ -143,10 +142,10 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
}
var prov_arr []*peer.Peer
for pid,addr := range addrs {
for pid, addr := range addrs {
p := s.network.Find(pid)
if p == nil {
maddr,err := ma.NewMultiaddr(addr)
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
u.PErr("error connecting to new peer: %s", err)
continue
......@@ -171,23 +170,23 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
p := s.routes[0].NearestPeer(convertPeerID(id))
pmes := pDHTMessage{
Type: DHTMessage_FIND_NODE,
Key: string(id),
Id: GenerateMessageID(),
pmes := DHTMessage{
Type: PBDHTMessage_FIND_NODE,
Key: string(id),
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
s.network.Chan.Outgoing <-mes
s.network.Chan.Outgoing <- mes
after := time.After(timeout)
select {
case <-after:
s.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listen_chan:
pmes_out := new(DHTMessage)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
......@@ -218,7 +217,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
u.DOut("Enter Ping.")
pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING}
pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
......@@ -229,7 +228,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
select {
case <-response_chan:
roundtrip := time.Since(before)
p.SetDistance(roundtrip)
p.SetLatency(roundtrip)
u.POut("Ping took %s.", roundtrip.String())
return nil
case <-tout:
......@@ -246,17 +245,17 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
targets := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
// TODO: Add timeout to this struct so nodes know when to return
pmes := pDHTMessage{
Type: DHTMessage_DIAGNOSTIC,
Id: GenerateMessageID(),
pmes := DHTMessage{
Type: PBDHTMessage_DIAGNOSTIC,
Id: GenerateMessageID(),
}
listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute * 2)
listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute*2)
pbmes := pmes.ToProtobuf()
for _,p := range targets {
for _, p := range targets {
mes := swarm.NewMessage(p, pbmes)
dht.network.Chan.Outgoing <-mes
dht.network.Chan.Outgoing <- mes
}
var out []*diagInfo
......@@ -267,7 +266,7 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
u.DOut("Diagnostic request timed out.")
return out, u.ErrTimeout
case resp := <-listen_chan:
pmes_out := new(DHTMessage)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
// NOTE: here and elsewhere, need to audit error handling,
......@@ -288,5 +287,5 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
}
}
return nil,nil
return nil, nil
}
......@@ -125,7 +125,7 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
cpl := xor(id, rt.local).commonPrefixLen()
cpl := prefLen(id, rt.local)
// Get bucket at cpl index or last bucket
var bucket *Bucket
......
......@@ -40,6 +40,10 @@ func (id ID) commonPrefixLen() int {
return len(id)*8 - 1
}
func prefLen(a, b ID) int {
return xor(a, b).commonPrefixLen()
}
func xor(a, b ID) ID {
a, b = equalizeSizes(a, b)
......
package swarm
import (
"errors"
"fmt"
"net"
"sync"
proto "code.google.com/p/goprotobuf/proto"
ident "github.com/jbenet/go-ipfs/identify"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
ident "github.com/jbenet/go-ipfs/identify"
proto "code.google.com/p/goprotobuf/proto"
)
// Message represents a packet of information sent to or received from a
......@@ -24,9 +25,10 @@ type Message struct {
// Cleaner looking helper function to make a new message struct
func NewMessage(p *peer.Peer, data proto.Message) *Message {
bytes,err := proto.Marshal(data)
bytes, err := proto.Marshal(data)
if err != nil {
panic(err)
u.PErr(err.Error())
return nil
}
return &Message{
Peer: p,
......@@ -63,7 +65,7 @@ func (se *SwarmListenErr) Error() string {
return "<nil error>"
}
var out string
for i,v := range se.Errors {
for i, v := range se.Errors {
if v != nil {
out += fmt.Sprintf("%d: %s\n", i, v)
}
......@@ -80,7 +82,7 @@ type Swarm struct {
conns ConnMap
connsLock sync.RWMutex
local *peer.Peer
local *peer.Peer
listeners []net.Listener
}
......@@ -137,7 +139,7 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
if err != nil {
e := fmt.Errorf("Failed to accept connection: %s - %s [%s]",
netstr, addr, err)
go func() {s.Chan.Errors <- e}()
go func() { s.Chan.Errors <- e }()
return
}
go s.handleNewConn(nconn)
......@@ -160,7 +162,9 @@ func (s *Swarm) handleNewConn(nconn net.Conn) {
err := ident.Handshake(s.local, p, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
if err != nil {
panic(err)
u.PErr(err.Error())
conn.Close()
return
}
// Get address to contact remote peer from
......@@ -186,7 +190,7 @@ func (s *Swarm) Close() {
s.Chan.Close <- true // fan out
s.Chan.Close <- true // listener
for _,list := range s.listeners {
for _, list := range s.listeners {
list.Close()
}
}
......@@ -220,9 +224,9 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
return conn, nil
}
func (s *Swarm) StartConn(conn *Conn) {
func (s *Swarm) StartConn(conn *Conn) error {
if conn == nil {
panic("tried to start nil Conn!")
return errors.New("Tried to start nil connection.")
}
u.DOut("Starting connection: %s", conn.Peer.Key().Pretty())
......@@ -233,6 +237,7 @@ func (s *Swarm) StartConn(conn *Conn) {
// kick off reader goroutine
go s.fanIn(conn)
return nil
}
// Handles the unwrapping + sending of messages to the right connection.
......@@ -303,3 +308,50 @@ func (s *Swarm) Find(key u.Key) *peer.Peer {
}
return conn.Peer
}
func (s *Swarm) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
if addr == nil {
return nil, errors.New("nil Multiaddr passed to swarm.Connect()")
}
npeer := new(peer.Peer)
npeer.AddAddress(addr)
conn, err := Dial("tcp", npeer)
if err != nil {
return nil, err
}
err = ident.Handshake(s.local, npeer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
if err != nil {
return nil, err
}
// Send node an address that you can be reached on
myaddr := s.local.NetAddress("tcp")
mastr, err := myaddr.String()
if err != nil {
return nil, errors.New("No local address to send to peer.")
}
conn.Outgoing.MsgChan <- []byte(mastr)
s.StartConn(conn)
return npeer, nil
}
// Removes a given peer from the swarm and closes connections to it
func (s *Swarm) Drop(p *peer.Peer) error {
s.connsLock.RLock()
conn, found := s.conns[u.Key(p.ID)]
s.connsLock.RUnlock()
if !found {
return u.ErrNotFound
}
s.connsLock.Lock()
delete(s.conns, u.Key(p.ID))
s.connsLock.Unlock()
return conn.Close()
}
package util
import (
"fmt"
"errors"
mh "github.com/jbenet/go-multihash"
"fmt"
"os"
"os/user"
"strings"
"encoding/hex"
b58 "github.com/jbenet/go-base58"
mh "github.com/jbenet/go-multihash"
)
// Debug is a global flag for debugging.
......@@ -23,11 +24,14 @@ var ErrTimeout = errors.New("Error: Call timed out.")
// find the expected node, but did find 'a' node.
var ErrSearchIncomplete = errors.New("Error: Search Incomplete.")
// ErrNotFound is returned when a search fails to find anything
var ErrNotFound = errors.New("Error: Not Found.")
// Key is a string representation of multihash for use with maps.
type Key string
func (k Key) Pretty() string {
return hex.EncodeToString([]byte(k))
return b58.Encode([]byte(k))
}
// Hash is the global IPFS hash function. uses multihash SHA2_256, 256 bits
......@@ -51,12 +55,12 @@ func TildeExpansion(filename string) (string, error) {
// PErr is a shorthand printing function to output to Stderr.
func PErr(format string, a ...interface{}) {
fmt.Fprintf(os.Stderr, format + "\n", a...)
fmt.Fprintf(os.Stderr, format+"\n", a...)
}
// POut is a shorthand printing function to output to Stdout.
func POut(format string, a ...interface{}) {
fmt.Fprintf(os.Stdout, format + "\n", a...)
fmt.Fprintf(os.Stdout, format+"\n", a...)
}
// DErr is a shorthand debug printing function to output to Stderr.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment