Commit 67ddab1e authored by Jeromy's avatar Jeromy

tiered put/get implemented

parent 9f760437
package peer
import (
"time"
"sync"
"time"
b58 "github.com/jbenet/go-base58"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
mh "github.com/jbenet/go-multihash"
b58 "github.com/jbenet/go-base58"
"bytes"
)
......@@ -33,7 +33,7 @@ type Peer struct {
ID ID
Addresses []*ma.Multiaddr
latency time.Duration
latency time.Duration
latenLock sync.RWMutex
}
......
package dht
import (
peer "github.com/jbenet/go-ipfs/peer"
)
// A helper struct to make working with protbuf types easier
type DHTMessage struct {
Type PBDHTMessage_MessageType
......@@ -8,6 +12,20 @@ type DHTMessage struct {
Response bool
Id uint64
Success bool
Peers []*peer.Peer
}
func peerInfo(p *peer.Peer) *PBDHTMessage_PBPeer {
pbp := new(PBDHTMessage_PBPeer)
addr, err := p.Addresses[0].String()
if err != nil {
//Temp: what situations could cause this?
panic(err)
}
pbp.Addr = &addr
pid := string(p.ID)
pbp.Id = &pid
return pbp
}
func (m *DHTMessage) ToProtobuf() *PBDHTMessage {
......@@ -21,6 +39,9 @@ func (m *DHTMessage) ToProtobuf() *PBDHTMessage {
pmes.Response = &m.Response
pmes.Id = &m.Id
pmes.Success = &m.Success
for _, p := range m.Peers {
pmes.Peers = append(pmes.Peers, peerInfo(p))
}
return pmes
}
......@@ -8,9 +8,9 @@ import (
"time"
peer "github.com/jbenet/go-ipfs/peer"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
ma "github.com/jbenet/go-multiaddr"
......@@ -37,7 +37,6 @@ type IpfsDHT struct {
datastore ds.Datastore
// Map keys to peers that can provide their value
// TODO: implement a TTL on each of these keys
providers map[u.Key][]*providerInfo
providerLock sync.RWMutex
......@@ -67,7 +66,7 @@ type listenInfo struct {
eol time.Time
}
// Create a new DHT object with the given peer as the 'local' host
// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
if p == nil {
return nil, errors.New("nil peer passed to NewDHT()")
......@@ -111,7 +110,7 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
// NOTE: this should be done better...
err = dht.Ping(npeer, time.Second*2)
if err != nil {
return nil, errors.New("Failed to ping newly connected peer.")
return nil, errors.New("failed to ping newly connected peer")
}
return npeer, nil
......@@ -227,7 +226,7 @@ func (dht *IpfsDHT) cleanExpiredListeners() {
dht.listenLock.Unlock()
}
func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error {
func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
pmes := DHTMessage{
Type: PBDHTMessage_PUT_VALUE,
Key: key,
......@@ -242,26 +241,32 @@ func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
var resp *DHTMessage
i_val, err := dht.datastore.Get(dskey)
resp := &DHTMessage{
Response: true,
Id: pmes.GetId(),
Key: pmes.GetKey(),
}
iVal, err := dht.datastore.Get(dskey)
if err == nil {
resp = &DHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: i_val.([]byte),
Success: true,
}
resp.Success = true
resp.Value = iVal.([]byte)
} else if err == ds.ErrNotFound {
// Find closest peer(s) to desired key and reply with that info
closer := dht.routes[0].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
resp = &DHTMessage{
Response: true,
Id: *pmes.Id,
Key: *pmes.Key,
Value: closer.ID,
Success: false,
// Check if we know any providers for the requested value
provs, ok := dht.providers[u.Key(pmes.GetKey())]
if ok && len(provs) > 0 {
for _, prov := range provs {
resp.Peers = append(resp.Peers, prov.Value)
}
resp.Success = true
} else {
// No providers?
// Find closest peer(s) to desired key and reply with that info
closer := dht.routes[0].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
resp.Peers = []*peer.Peer{closer}
}
} else {
//temp: what other errors can a datastore throw?
panic(err)
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
......@@ -397,6 +402,7 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) {
close(list.resp)
}
// Check whether or not the dht is currently listening for mesid
func (dht *IpfsDHT) IsListening(mesid uint64) bool {
dht.listenLock.RLock()
li, ok := dht.listeners[mesid]
......@@ -424,6 +430,7 @@ func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
dht.providerLock.Unlock()
}
// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
dht.diaglock.Lock()
if dht.IsListening(pmes.GetId()) {
......@@ -434,7 +441,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
dht.diaglock.Unlock()
seq := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30)
listenChan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30)
for _, ps := range seq {
mes := swarm.NewMessage(ps, pmes)
......@@ -453,7 +460,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
case <-after:
//Timeout, return what we have
goto out
case req_resp := <-listen_chan:
case req_resp := <-listenChan:
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(req_resp.Data, pmes_out)
if err != nil {
......@@ -477,6 +484,77 @@ out:
dht.network.Chan.Outgoing <- mes
}
func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration) ([]byte, error) {
pmes := DHTMessage{
Type: PBDHTMessage_GET_VALUE,
Key: string(key),
Id: GenerateMessageID(),
}
response_chan := dht.ListenFor(pmes.Id, 1, time.Minute)
mes := swarm.NewMessage(p, pmes.ToProtobuf())
dht.network.Chan.Outgoing <- mes
// Wait for either the response or a timeout
timeup := time.After(timeout)
select {
case <-timeup:
dht.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp, ok := <-response_chan:
if !ok {
u.PErr("response channel closed before timeout, please investigate.")
return nil, u.ErrTimeout
}
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
return nil, err
}
// TODO: debate moving this logic out of this function to be handled by the caller
if pmes_out.GetSuccess() {
if pmes_out.Value == nil {
// We were given provider[s]
return dht.getFromProviderList(key, timeout, pmes_out.GetPeers())
}
// We were given the value
return pmes_out.GetValue(), nil
} else {
return pmes_out.GetValue(), u.ErrSearchIncomplete
}
}
}
// TODO: Im not certain on this implementation, we get a list of providers from someone
// what do we do with it? Connect to each of them? randomly pick one to get the value from?
// Or just connect to one at a time until we get a successful connection and request the
// value from it?
func (dht *IpfsDHT) getFromProviderList(key u.Key, timeout time.Duration, provlist []*PBDHTMessage_PBPeer) ([]byte, error) {
for _, prov := range provlist {
prov_p, _ := dht.Find(peer.ID(prov.GetId()))
if prov_p == nil {
maddr, err := ma.NewMultiaddr(prov.GetAddr())
if err != nil {
u.PErr("getValue error: %s", err)
continue
}
prov_p, err = dht.Connect(maddr)
if err != nil {
u.PErr("getValue error: %s", err)
continue
}
}
data, err := dht.getValueSingle(prov_p, key, timeout)
if err != nil {
u.DErr("getFromProvs error: %s", err)
continue
}
return data, nil
}
return nil, u.ErrNotFound
}
func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
v, err := dht.datastore.Get(ds.NewKey(string(key)))
if err != nil {
......@@ -495,3 +573,14 @@ func (dht *IpfsDHT) Update(p *peer.Peer) {
dht.network.Drop(removed)
}
}
// Look for a peer with a given ID connected to this dht
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
for _, table := range dht.routes {
p := table.Find(id)
if p != nil {
return p, table
}
}
return nil, nil
}
......@@ -90,15 +90,21 @@ func TestValueGetSet(t *testing.T) {
dht_a.Start()
dht_b.Start()
go func() {
select {
case err := <-dht_a.network.Chan.Errors:
t.Fatal(err)
case err := <-dht_b.network.Chan.Errors:
t.Fatal(err)
}
}()
_, err = dht_a.Connect(addr_b)
if err != nil {
t.Fatal(err)
}
err = dht_a.PutValue("hello", []byte("world"))
if err != nil {
t.Fatal(err)
}
dht_a.PutValue("hello", []byte("world"))
val, err := dht_a.GetValue("hello", time.Second*2)
if err != nil {
......@@ -179,3 +185,73 @@ func TestProvides(t *testing.T) {
dhts[i].Halt()
}
}
func TestLayeredGet(t *testing.T) {
u.Debug = false
var addrs []*ma.Multiaddr
for i := 0; i < 4; i++ {
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i))
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, a)
}
var peers []*peer.Peer
for i := 0; i < 4; i++ {
p := new(peer.Peer)
p.AddAddress(addrs[i])
p.ID = peer.ID([]byte(fmt.Sprintf("peer_%d", i)))
peers = append(peers, p)
}
var dhts []*IpfsDHT
for i := 0; i < 4; i++ {
d, err := NewDHT(peers[i])
if err != nil {
t.Fatal(err)
}
dhts = append(dhts, d)
d.Start()
}
_, err := dhts[0].Connect(addrs[1])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(addrs[2])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(addrs[3])
if err != nil {
t.Fatal(err)
}
err = dhts[3].PutLocal(u.Key("hello"), []byte("world"))
if err != nil {
t.Fatal(err)
}
err = dhts[3].Provide(u.Key("hello"))
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 60)
val, err := dhts[0].GetValue(u.Key("hello"), time.Second)
if err != nil {
t.Fatal(err)
}
if string(val) != "world" {
t.Fatal("Got incorrect value.")
}
for i := 0; i < 4; i++ {
dhts[i].Halt()
}
}
......@@ -9,14 +9,14 @@ import (
type connDiagInfo struct {
Latency time.Duration
Id peer.ID
Id peer.ID
}
type diagInfo struct {
Id peer.ID
Id peer.ID
Connections []connDiagInfo
Keys []string
LifeSpan time.Duration
Keys []string
LifeSpan time.Duration
CodeVersion string
}
......@@ -29,7 +29,6 @@ func (di *diagInfo) Marshal() []byte {
return b
}
func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di := new(diagInfo)
di.CodeVersion = "github.com/jbenet/go-ipfs"
......@@ -37,7 +36,7 @@ func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di.LifeSpan = time.Since(dht.birth)
di.Keys = nil // Currently no way to query datastore
for _,p := range dht.routes[0].Listpeers() {
for _, p := range dht.routes[0].Listpeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID})
}
return di
......
......@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
"math/rand"
"time"
......@@ -32,58 +33,50 @@ func GenerateMessageID() uint64 {
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var p *peer.Peer
p = s.routes[0].NearestPeer(kb.ConvertKey(key))
if p == nil {
return errors.New("Table returned nil peer!")
func (s *IpfsDHT) PutValue(key u.Key, value []byte) {
complete := make(chan struct{})
for i, route := range s.routes {
p := route.NearestPeer(kb.ConvertKey(key))
if p == nil {
s.network.Chan.Errors <- fmt.Errorf("No peer found on level %d", i)
continue
go func() {
complete <- struct{}{}
}()
}
go func() {
err := s.putValueToNetwork(p, string(key), value)
if err != nil {
s.network.Chan.Errors <- err
}
complete <- struct{}{}
}()
}
for _, _ = range s.routes {
<-complete
}
return s.putValueToPeer(p, string(key), value)
}
// GetValue searches for the value corresponding to given Key.
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
var p *peer.Peer
p = s.routes[0].NearestPeer(kb.ConvertKey(key))
if p == nil {
return nil, errors.New("Table returned nil peer!")
}
pmes := DHTMessage{
Type: PBDHTMessage_GET_VALUE,
Key: string(key),
Id: GenerateMessageID(),
}
response_chan := s.ListenFor(pmes.Id, 1, time.Minute)
mes := swarm.NewMessage(p, pmes.ToProtobuf())
s.network.Chan.Outgoing <- mes
for _, route := range s.routes {
var p *peer.Peer
p = route.NearestPeer(kb.ConvertKey(key))
if p == nil {
return nil, errors.New("Table returned nil peer!")
}
// Wait for either the response or a timeout
timeup := time.After(timeout)
select {
case <-timeup:
s.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp, ok := <-response_chan:
if !ok {
u.PErr("response channel closed before timeout, please investigate.")
return nil, u.ErrTimeout
b, err := s.getValueSingle(p, key, timeout)
if err == nil {
return b, nil
}
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
if err != u.ErrSearchIncomplete {
return nil, err
}
if pmes_out.GetSuccess() {
return pmes_out.GetValue(), nil
} else {
return pmes_out.GetValue(), u.ErrSearchIncomplete
}
}
return nil, u.ErrNotFound
}
// Value provider layer of indirection.
......@@ -121,7 +114,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
listenChan := s.ListenFor(pmes.Id, 1, time.Minute)
u.DOut("Find providers for: '%s'", key)
s.network.Chan.Outgoing <- mes
after := time.After(timeout)
......@@ -129,7 +122,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
case <-after:
s.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listen_chan:
case resp := <-listenChan:
u.DOut("FindProviders: got response.")
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
......@@ -179,14 +172,14 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
listenChan := s.ListenFor(pmes.Id, 1, time.Minute)
s.network.Chan.Outgoing <- mes
after := time.After(timeout)
select {
case <-after:
s.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listen_chan:
case resp := <-listenChan:
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
......@@ -251,7 +244,7 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
Id: GenerateMessageID(),
}
listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute*2)
listenChan := dht.ListenFor(pmes.Id, len(targets), time.Minute*2)
pbmes := pmes.ToProtobuf()
for _, p := range targets {
......@@ -266,7 +259,7 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
case <-after:
u.DOut("Diagnostic request timed out.")
return out, u.ErrTimeout
case resp := <-listen_chan:
case resp := <-listenChan:
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
......
......@@ -5,6 +5,7 @@ import (
peer "github.com/jbenet/go-ipfs/peer"
)
// Bucket holds a list of peers.
type Bucket list.List
......
......@@ -19,7 +19,7 @@ type RoutingTable struct {
tabLock sync.RWMutex
// kBuckets define all the fingers to other nodes.
Buckets []*Bucket
Buckets []*Bucket
bucketsize int
}
......@@ -52,7 +52,7 @@ func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
// Are we past the max bucket size?
if bucket.Len() > rt.bucketsize {
if b_id == len(rt.Buckets) - 1 {
if b_id == len(rt.Buckets)-1 {
new_bucket := bucket.Split(b_id, rt.local)
rt.Buckets = append(rt.Buckets, new_bucket)
if new_bucket.Len() > rt.bucketsize {
......@@ -81,25 +81,27 @@ func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
// A helper struct to sort peers by their distance to the local node
type peerDistance struct {
p *peer.Peer
p *peer.Peer
distance ID
}
// peerSorterArr implements sort.Interface to sort peers by xor distance
type peerSorterArr []*peerDistance
func (p peerSorterArr) Len() int {return len(p)}
func (p peerSorterArr) Swap(a, b int) {p[a],p[b] = p[b],p[a]}
func (p peerSorterArr) Len() int { return len(p) }
func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] }
func (p peerSorterArr) Less(a, b int) bool {
return p[a].distance.Less(p[b].distance)
}
//
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
for e := peerList.Front(); e != nil; e = e.Next() {
for e := peerList.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer.Peer)
p_id := ConvertPeerID(p.ID)
pd := peerDistance{
p: p,
p: p,
distance: xor(target, p_id),
}
peerArr = append(peerArr, &pd)
......@@ -111,6 +113,15 @@ func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) pe
return peerArr
}
// Find a specific peer by ID or return nil
func (rt *RoutingTable) Find(id peer.ID) *peer.Peer {
srch := rt.NearestPeers(ConvertPeerID(id), 1)
if len(srch) == 0 || !srch[0].ID.Equal(id) {
return nil
}
return srch[0]
}
// Returns a single peer that is nearest to the given ID
func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
peers := rt.NearestPeers(id, 1)
......@@ -139,12 +150,12 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
// In the case of an unusual split, one bucket may be empty.
// if this happens, search both surrounding buckets for nearest peer
if cpl > 0 {
plist := (*list.List)(rt.Buckets[cpl - 1])
plist := (*list.List)(rt.Buckets[cpl-1])
peerArr = copyPeersFromList(id, peerArr, plist)
}
if cpl < len(rt.Buckets) - 1 {
plist := (*list.List)(rt.Buckets[cpl + 1])
if cpl < len(rt.Buckets)-1 {
plist := (*list.List)(rt.Buckets[cpl+1])
peerArr = copyPeersFromList(id, peerArr, plist)
}
} else {
......@@ -166,7 +177,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
// Returns the total number of peers in the routing table
func (rt *RoutingTable) Size() int {
var tot int
for _,buck := range rt.Buckets {
for _, buck := range rt.Buckets {
tot += buck.Len()
}
return tot
......@@ -175,7 +186,7 @@ func (rt *RoutingTable) Size() int {
// NOTE: This is potentially unsafe... use at your own risk
func (rt *RoutingTable) Listpeers() []*peer.Peer {
var peers []*peer.Peer
for _,buck := range rt.Buckets {
for _, buck := range rt.Buckets {
for e := buck.getIter(); e != nil; e = e.Next() {
peers = append(peers, e.Value.(*peer.Peer))
}
......
package dht
import (
"container/list"
crand "crypto/rand"
"crypto/sha256"
"math/rand"
"container/list"
"testing"
peer "github.com/jbenet/go-ipfs/peer"
......
......@@ -44,7 +44,7 @@ func TestSwarm(t *testing.T) {
swarm := NewSwarm(nil)
var peers []*peer.Peer
var listeners []net.Listener
var listeners []net.Listener
peerNames := map[string]string{
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30": "/ip4/127.0.0.1/tcp/1234",
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31": "/ip4/127.0.0.1/tcp/2345",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment