Commit c372d79e authored by Jeromy's avatar Jeromy

switch to strings and cids instead of keys

parent 347e481b
......@@ -9,23 +9,25 @@ import (
"sync"
"time"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
providers "github.com/libp2p/go-libp2p-kad-dht/providers"
routing "github.com/libp2p/go-libp2p-routing"
proto "github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
key "github.com/ipfs/go-key"
ci "github.com/ipfs/go-libp2p-crypto"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
providers "github.com/libp2p/go-libp2p-kad-dht/providers"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
routing "github.com/libp2p/go-libp2p-routing"
host "github.com/libp2p/go-libp2p/p2p/host"
protocol "github.com/libp2p/go-libp2p/p2p/protocol"
base32 "github.com/whyrusleeping/base32"
context "golang.org/x/net/context"
)
......@@ -131,9 +133,9 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
key key.Key, rec *recpb.Record) error {
key string, rec *recpb.Record) error {
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes := pb.NewMessage(pb.Message_PUT_VALUE, key, 0)
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
switch err {
......@@ -162,8 +164,7 @@ var errInvalidRecord = errors.New("received invalid record")
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
key key.Key) (*recpb.Record, []pstore.PeerInfo, error) {
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []pstore.PeerInfo, error) {
pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
......@@ -198,11 +199,15 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
}
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
key key.Key) (*pb.Message, error) {
defer log.EventBegin(ctx, "getValueSingle", p, &key).Done()
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
meta := logging.LoggableMap{
"key": key,
"peer": p,
}
defer log.EventBegin(ctx, "getValueSingle", meta).Done()
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
pmes := pb.NewMessage(pb.Message_GET_VALUE, key, 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
......@@ -216,14 +221,14 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
}
// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key key.Key) (*recpb.Record, error) {
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
log.Debugf("getLocal %s", key)
log.Debug("getLocal %s", key)
v, err := dht.datastore.Get(key.DsKey())
v, err := dht.datastore.Get(mkDsKey(key))
if err != nil {
return nil, err
}
log.Debug("found in db")
log.Debugf("found %s in local datastore")
byt, ok := v.([]byte)
if !ok {
......@@ -256,13 +261,13 @@ func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) {
}
// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key key.Key, rec *recpb.Record) error {
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
data, err := proto.Marshal(rec)
if err != nil {
return err
}
return dht.datastore.Put(key.DsKey(), data)
return dht.datastore.Put(mkDsKey(key), data)
}
// Update signals the routingTable to Update its last-seen status
......@@ -298,10 +303,10 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.Key) (*pb.Message, error) {
defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done()
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid.Cid) (*pb.Message, error) {
defer log.EventBegin(ctx, "findProvidersSingle", p, key).Done()
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.KeyString(), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
......@@ -316,8 +321,7 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.
// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
key := key.Key(pmes.GetKey())
closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
closer := dht.routingTable.NearestPeers(kb.ConvertKey(pmes.GetKey()), count)
return closer
}
......@@ -366,3 +370,7 @@ func (dht *IpfsDHT) Process() goprocess.Process {
func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}
func mkDsKey(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
package dht
import (
"bytes"
"fmt"
"math/rand"
"sort"
......@@ -11,29 +10,32 @@ import (
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
u "github.com/ipfs/go-ipfs-util"
key "github.com/ipfs/go-key"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
ma "github.com/jbenet/go-multiaddr"
record "github.com/libp2p/go-libp2p-record"
routing "github.com/libp2p/go-libp2p-routing"
netutil "github.com/libp2p/go-libp2p/p2p/test/util"
ci "github.com/libp2p/go-testutil/ci"
travisci "github.com/libp2p/go-testutil/ci/travis"
context "golang.org/x/net/context"
)
var testCaseValues = map[key.Key][]byte{}
var testCaseValues = map[string][]byte{}
var testCaseCids []*cid.Cid
func init() {
testCaseValues["hello"] = []byte("world")
for i := 0; i < 100; i++ {
k := fmt.Sprintf("%d -- key", i)
v := fmt.Sprintf("%d -- value", i)
testCaseValues[key.Key(k)] = []byte(v)
testCaseValues[k] = []byte(v)
mhv := u.Hash([]byte(v))
testCaseCids = append(testCaseCids, cid.NewCidV0(mhv))
}
}
......@@ -49,11 +51,12 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
}
d.Validator["v"] = &record.ValidChecker{
Func: func(key.Key, []byte) error {
Func: func(string, []byte) error {
return nil
},
Sign: false,
}
d.Selector["v"] = func(_ string, bs [][]byte) (int, error) { return 0, nil }
return d
}
......@@ -136,9 +139,8 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
}
func TestValueGetSet(t *testing.T) {
// t.Skip("skipping test to debug another")
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)
......@@ -149,14 +151,10 @@ func TestValueGetSet(t *testing.T) {
defer dhtB.host.Close()
vf := &record.ValidChecker{
Func: func(key.Key, []byte) error {
return nil
},
Func: func(string, []byte) error { return nil },
Sign: false,
}
nulsel := func(_ key.Key, bs [][]byte) (int, error) {
return 0, nil
}
nulsel := func(_ string, bs [][]byte) (int, error) { return 0, nil }
dhtA.Validator["v"] = vf
dhtB.Validator["v"] = vf
......@@ -165,27 +163,34 @@ func TestValueGetSet(t *testing.T) {
connect(t, ctx, dhtA, dhtB)
log.Error("adding value on: ", dhtA.self)
ctxT, _ := context.WithTimeout(ctx, time.Second)
dhtA.PutValue(ctxT, "/v/hello", []byte("world"))
ctxT, _ = context.WithTimeout(ctx, time.Second*2)
val, err := dhtA.GetValue(ctxT, "/v/hello")
err := dhtA.PutValue(ctxT, "/v/hello", []byte("world"))
if err != nil {
t.Fatal(err)
}
if string(val) != "world" {
t.Fatalf("Expected 'world' got '%s'", string(val))
}
/*
ctxT, _ = context.WithTimeout(ctx, time.Second*2)
val, err := dhtA.GetValue(ctxT, "/v/hello")
if err != nil {
t.Fatal(err)
}
if string(val) != "world" {
t.Fatalf("Expected 'world' got '%s'", string(val))
}
*/
log.Error("requesting value on dht: ", dhtB.self)
ctxT, _ = context.WithTimeout(ctx, time.Second*2)
val, err = dhtB.GetValue(ctxT, "/v/hello")
valb, err := dhtB.GetValue(ctxT, "/v/hello")
if err != nil {
t.Fatal(err)
}
if string(val) != "world" {
t.Fatalf("Expected 'world' got '%s'", string(val))
if string(valb) != "world" {
t.Fatalf("Expected 'world' got '%s'", string(valb))
}
}
......@@ -205,29 +210,7 @@ func TestProvides(t *testing.T) {
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
for k, v := range testCaseValues {
log.Debugf("adding local values for %s = %s", k, v)
sk := dhts[3].peerstore.PrivKey(dhts[3].self)
rec, err := record.MakePutRecord(sk, k, v, false)
if err != nil {
t.Fatal(err)
}
err = dhts[3].putLocal(k, rec)
if err != nil {
t.Fatal(err)
}
bits, err := dhts[3].getLocal(k)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(bits.GetValue(), v) {
t.Fatalf("didn't store the right bits (%s, %s)", k, v)
}
}
for k := range testCaseValues {
for _, k := range testCaseCids {
log.Debugf("announcing provider for %s", k)
if err := dhts[3].Provide(ctx, k); err != nil {
t.Fatal(err)
......@@ -238,12 +221,12 @@ func TestProvides(t *testing.T) {
time.Sleep(time.Millisecond * 6)
n := 0
for k := range testCaseValues {
for _, c := range testCaseCids {
n = (n + 1) % 3
log.Debugf("getting providers for %s from %d", k, n)
log.Debugf("getting providers for %s from %d", c, n)
ctxT, _ := context.WithTimeout(ctx, time.Second)
provchan := dhts[n].FindProvidersAsync(ctxT, k, 1)
provchan := dhts[n].FindProvidersAsync(ctxT, c, 1)
select {
case prov := <-provchan:
......@@ -472,35 +455,16 @@ func TestProvidesMany(t *testing.T) {
}
}
var providers = map[key.Key]peer.ID{}
providers := make(map[string]peer.ID)
d := 0
for k, v := range testCaseValues {
for _, c := range testCaseCids {
d = (d + 1) % len(dhts)
dht := dhts[d]
providers[k] = dht.self
providers[c.KeyString()] = dht.self
t.Logf("adding local values for %s = %s (on %s)", k, v, dht.self)
rec, err := record.MakePutRecord(nil, k, v, false)
if err != nil {
t.Fatal(err)
}
err = dht.putLocal(k, rec)
if err != nil {
t.Fatal(err)
}
bits, err := dht.getLocal(k)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(bits.GetValue(), v) {
t.Fatalf("didn't store the right bits (%s, %s)", k, v)
}
t.Logf("announcing provider for %s", k)
if err := dht.Provide(ctx, k); err != nil {
t.Logf("announcing provider for %s", c)
if err := dht.Provide(ctx, c); err != nil {
t.Fatal(err)
}
}
......@@ -513,10 +477,10 @@ func TestProvidesMany(t *testing.T) {
ctxT, _ = context.WithTimeout(ctx, 5*time.Second)
var wg sync.WaitGroup
getProvider := func(dht *IpfsDHT, k key.Key) {
getProvider := func(dht *IpfsDHT, k *cid.Cid) {
defer wg.Done()
expected := providers[k]
expected := providers[k.KeyString()]
provchan := dht.FindProvidersAsync(ctxT, k, 1)
select {
......@@ -533,12 +497,12 @@ func TestProvidesMany(t *testing.T) {
}
}
for k := range testCaseValues {
for _, c := range testCaseCids {
// everyone should be able to find it...
for _, dht := range dhts {
log.Debugf("getting providers for %s at %s", k, dht.self)
log.Debugf("getting providers for %s at %s", c, dht.self)
wg.Add(1)
go getProvider(dht, k)
go getProvider(dht, c)
}
}
......@@ -573,25 +537,7 @@ func TestProvidesAsync(t *testing.T) {
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
k := key.Key("hello")
val := []byte("world")
sk := dhts[3].peerstore.PrivKey(dhts[3].self)
rec, err := record.MakePutRecord(sk, k, val, false)
if err != nil {
t.Fatal(err)
}
err = dhts[3].putLocal(k, rec)
if err != nil {
t.Fatal(err)
}
bits, err := dhts[3].getLocal(k)
if err != nil && bytes.Equal(bits.GetValue(), val) {
t.Fatal(err)
}
err = dhts[3].Provide(ctx, key.Key("hello"))
err := dhts[3].Provide(ctx, testCaseCids[0])
if err != nil {
t.Fatal(err)
}
......@@ -599,7 +545,7 @@ func TestProvidesAsync(t *testing.T) {
time.Sleep(time.Millisecond * 60)
ctxT, _ := context.WithTimeout(ctx, time.Millisecond*300)
provs := dhts[0].FindProvidersAsync(ctxT, key.Key("hello"), 5)
provs := dhts[0].FindProvidersAsync(ctxT, testCaseCids[0], 5)
select {
case p, ok := <-provs:
if !ok {
......@@ -617,12 +563,8 @@ func TestProvidesAsync(t *testing.T) {
}
func TestLayeredGet(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, _, dhts := setupDHTS(ctx, 4, t)
defer func() {
......@@ -634,26 +576,23 @@ func TestLayeredGet(t *testing.T) {
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
connect(t, ctx, dhts[2], dhts[3])
err := dhts[3].Provide(ctx, key.Key("/v/hello"))
err := dhts[3].PutValue(ctx, "/v/hello", []byte("world"))
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 6)
t.Log("interface was changed. GetValue should not use providers.")
ctxT, _ := context.WithTimeout(ctx, time.Second)
val, err := dhts[0].GetValue(ctxT, key.Key("/v/hello"))
if err != routing.ErrNotFound {
t.Error(err)
}
if string(val) == "world" {
t.Error("should not get value.")
val, err := dhts[0].GetValue(ctxT, "/v/hello")
if err != nil {
t.Fatal(err)
}
if len(val) > 0 && string(val) != "world" {
t.Error("worse, there's a value and its not even the right one.")
if string(val) != "world" {
t.Error("got wrong value")
}
}
......@@ -857,11 +796,12 @@ func TestClientModeConnect(t *testing.T) {
connectNoSync(t, ctx, a, b)
k := key.Key("TestHash")
c := testCaseCids[0]
p := peer.ID("TestPeer")
a.providers.AddProvider(ctx, k, p)
a.providers.AddProvider(ctx, c, p)
time.Sleep(time.Millisecond * 5) // just in case...
provs, err := b.FindProviders(ctx, k)
provs, err := b.FindProviders(ctx, c)
if err != nil {
t.Fatal(err)
}
......
......@@ -10,7 +10,6 @@ import (
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
u "github.com/ipfs/go-ipfs-util"
key "github.com/ipfs/go-key"
pstore "github.com/ipfs/go-libp2p-peerstore"
record "github.com/libp2p/go-libp2p-record"
routing "github.com/libp2p/go-libp2p-routing"
......@@ -44,7 +43,7 @@ func TestGetFailures(t *testing.T) {
// This one should time out
ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond)
if _, err := d.GetValue(ctx1, key.Key("test")); err != nil {
if _, err := d.GetValue(ctx1, "test"); err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
}
......@@ -84,7 +83,7 @@ func TestGetFailures(t *testing.T) {
// (was 3 seconds before which should be _plenty_ of time, but maybe
// travis machines really have a hard time...)
ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second)
_, err = d.GetValue(ctx2, key.Key("test"))
_, err = d.GetValue(ctx2, "test")
if err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
......@@ -108,7 +107,7 @@ func TestGetFailures(t *testing.T) {
t.Fatal(err)
}
rec, err := record.MakePutRecord(sk, key.Key(str), []byte("blah"), true)
rec, err := record.MakePutRecord(sk, str, []byte("blah"), true)
if err != nil {
t.Fatal(err)
}
......@@ -202,7 +201,7 @@ func TestNotFound(t *testing.T) {
// long timeout to ensure timing is not at play.
ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
v, err := d.GetValue(ctx, key.Key("hello"))
v, err := d.GetValue(ctx, "hello")
log.Debugf("get value got %v", v)
if err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
......@@ -275,7 +274,7 @@ func TestLessThanKResponses(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
if _, err := d.GetValue(ctx, key.Key("hello")); err != nil {
if _, err := d.GetValue(ctx, "hello"); err != nil {
switch err {
case routing.ErrNotFound:
//Success!
......
......@@ -6,14 +6,15 @@ import (
"time"
proto "github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
u "github.com/ipfs/go-ipfs-util"
key "github.com/ipfs/go-key"
lgbl "github.com/ipfs/go-libp2p-loggables"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
recpb "github.com/libp2p/go-libp2p-record/pb"
base32 "github.com/whyrusleeping/base32"
context "golang.org/x/net/context"
)
......@@ -50,7 +51,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// first, is there even a key?
k := key.Key(pmes.GetKey())
k := pmes.GetKey()
if k == "" {
return nil, errors.New("handleGetValue but no key was provided")
// TODO: send back an error response? could be bad, but the other node's hanging.
......@@ -82,9 +83,9 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return resp, nil
}
func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*recpb.Record, error) {
func (dht *IpfsDHT) checkLocalDatastore(k string) (*recpb.Record, error) {
log.Debugf("%s handleGetValue looking into ds", dht.self)
dskey := k.DsKey()
dskey := convertToDsKey(k)
iVal, err := dht.datastore.Get(dskey)
log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)
......@@ -148,7 +149,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*recpb.Record, error) {
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handlePutValue", p).Done()
dskey := key.Key(pmes.GetKey()).DsKey()
dskey := convertToDsKey(pmes.GetKey())
rec := pmes.GetRecord()
if rec == nil {
......@@ -157,7 +158,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
}
if err := dht.verifyRecordLocally(rec); err != nil {
log.Warningf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err)
log.Warningf("Bad dht record in PUT from: %s. %s", peer.ID(pmes.GetRecord().GetAuthor()), err)
return nil, err
}
......@@ -215,23 +216,27 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
defer log.EventBegin(ctx, "handleGetProviders", lm).Done()
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
key := key.Key(pmes.GetKey())
lm["key"] = func() interface{} { return key.B58String() }
c, err := cid.Cast([]byte(pmes.GetKey()))
if err != nil {
return nil, err
}
lm["key"] = func() interface{} { return c.String() }
// debug logging niceness.
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key)
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, c)
log.Debugf("%s begin", reqDesc)
defer log.Debugf("%s end", reqDesc)
// check if we have this value, to add ourselves as provider.
has, err := dht.datastore.Has(key.DsKey())
has, err := dht.datastore.Has(convertToDsKey(c.KeyString()))
if err != nil && err != ds.ErrNotFound {
log.Debugf("unexpected datastore error: %v\n", err)
has = false
}
// setup providers
providers := dht.providers.GetProviders(ctx, key)
providers := dht.providers.GetProviders(ctx, c)
if has {
providers = append(providers, dht.self)
log.Debugf("%s have the value. added self as provider", reqDesc)
......@@ -259,10 +264,14 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
lm["peer"] = func() interface{} { return p.Pretty() }
defer log.EventBegin(ctx, "handleAddProvider", lm).Done()
key := key.Key(pmes.GetKey())
lm["key"] = func() interface{} { return key.B58String() }
c, err := cid.Cast([]byte(pmes.GetKey()))
if err != nil {
return nil, err
}
lm["key"] = func() interface{} { return c.String() }
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, c)
// add provider should use the address given in the message
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
......@@ -279,13 +288,17 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
continue
}
log.Infof("received provider %s for %s (addrs: %s)", p, key, pi.Addrs)
log.Infof("received provider %s for %s (addrs: %s)", p, c, pi.Addrs)
if pi.ID != dht.self { // dont add own addrs.
// add the received addresses to our peerstore.
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, pstore.ProviderAddrTTL)
}
dht.providers.AddProvider(ctx, key, p)
dht.providers.AddProvider(ctx, c, p)
}
return nil, nil
}
func convertToDsKey(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
package dht
import (
key "github.com/ipfs/go-key"
peer "github.com/ipfs/go-libp2p-peer"
pset "github.com/ipfs/go-libp2p-peer/peerset"
pstore "github.com/ipfs/go-libp2p-peerstore"
logging "github.com/ipfs/go-log"
kb "github.com/libp2p/go-libp2p-kbucket"
notif "github.com/libp2p/go-libp2p-routing/notifications"
context "golang.org/x/net/context"
......@@ -20,10 +20,16 @@ func pointerizePeerInfos(pis []pstore.PeerInfo) []*pstore.PeerInfo {
return out
}
func loggableKey(k string) logging.LoggableMap {
return logging.LoggableMap{
"key": k,
}
}
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key key.Key) (<-chan peer.ID, error) {
e := log.EventBegin(ctx, "getClosestPeers", &key)
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
e := log.EventBegin(ctx, "getClosestPeers", loggableKey(key))
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
if len(tablepeers) == 0 {
return nil, kb.ErrLookupFailure
......@@ -92,7 +98,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key key.Key) (<-chan pe
return out, nil
}
func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key key.Key, p peer.ID) ([]peer.ID, error) {
func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key string, p peer.ID) ([]peer.ID, error) {
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
if err != nil {
return nil, err
......
......@@ -101,21 +101,21 @@
},
{
"author": "whyrusleeping",
"hash": "Qme7D9iKHYxwq28p6PzCymywsYSRBx9uyGzW7qNB3s9VbC",
"hash": "QmV4WegGoE6DcQdS3fPJjfMykWD9RqhheQ2gKdzkMJnvr9",
"name": "go-libp2p-record",
"version": "1.0.1"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmVsCNFD32GzZ6Q5XD1TVGPRviNYqDdoNvgq853TU9hhzP",
"hash": "QmepYi89BQ3qAhDXejox51WUm7toGBGd9xVVB7fhpsiXuv",
"name": "go-libp2p-kbucket",
"version": "1.0.3"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmemZcG8WprPbnVX3AM43GhhSUiA3V6NjcTLAguvWzkdpQ",
"hash": "QmReZ6DcDDUnPQGXV7FqBnVW3XzXQmKV6oMR9TFMpqGcTv",
"name": "go-libp2p-routing",
"version": "1.0.3"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
......@@ -128,6 +128,12 @@
"hash": "QmbiRCGZqhfcSjnm9icGz3oNQQdPLAnLWnKHXixaEWXVCN",
"name": "go-libp2p",
"version": "3.5.4"
},
{
"author": "whyrusleeping",
"hash": "QmcW7CcRA5kMdqNBRpif7e8y9yvVRmJG1uurMvea8TY2SM",
"name": "go-cid",
"version": "0.5.0"
}
],
"gxVersion": "0.4.0",
......
......@@ -7,9 +7,9 @@ import (
"time"
lru "github.com/hashicorp/golang-lru"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
key "github.com/ipfs/go-key"
peer "github.com/ipfs/go-libp2p-peer"
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
......@@ -48,12 +48,12 @@ type providerSet struct {
}
type addProv struct {
k key.Key
k *cid.Cid
val peer.ID
}
type getProv struct {
k key.Key
k *cid.Cid
resp chan []peer.ID
}
......@@ -77,15 +77,15 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
const providersKeyPrefix = "/providers/"
func mkProvKey(k key.Key) ds.Key {
return ds.NewKey(providersKeyPrefix + base32.RawStdEncoding.EncodeToString([]byte(k)))
func mkProvKey(k *cid.Cid) ds.Key {
return ds.NewKey(providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes()))
}
func (pm *ProviderManager) Process() goprocess.Process {
return pm.proc
}
func (pm *ProviderManager) providersForKey(k key.Key) ([]peer.ID, error) {
func (pm *ProviderManager) providersForKey(k *cid.Cid) ([]peer.ID, error) {
pset, err := pm.getProvSet(k)
if err != nil {
return nil, err
......@@ -93,7 +93,7 @@ func (pm *ProviderManager) providersForKey(k key.Key) ([]peer.ID, error) {
return pset.providers, nil
}
func (pm *ProviderManager) getProvSet(k key.Key) (*providerSet, error) {
func (pm *ProviderManager) getProvSet(k *cid.Cid) (*providerSet, error) {
cached, ok := pm.providers.Get(k)
if ok {
return cached.(*providerSet), nil
......@@ -111,7 +111,7 @@ func (pm *ProviderManager) getProvSet(k key.Key) (*providerSet, error) {
return pset, nil
}
func loadProvSet(dstore ds.Datastore, k key.Key) (*providerSet, error) {
func loadProvSet(dstore ds.Datastore, k *cid.Cid) (*providerSet, error) {
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k).String()})
if err != nil {
return nil, err
......@@ -160,7 +160,7 @@ func readTimeValue(i interface{}) (time.Time, error) {
return time.Unix(0, nsec), nil
}
func (pm *ProviderManager) addProv(k key.Key, p peer.ID) error {
func (pm *ProviderManager) addProv(k *cid.Cid, p peer.ID) error {
iprovs, ok := pm.providers.Get(k)
if !ok {
iprovs = newProviderSet()
......@@ -173,7 +173,7 @@ func (pm *ProviderManager) addProv(k key.Key, p peer.ID) error {
return writeProviderEntry(pm.dstore, k, p, now)
}
func writeProviderEntry(dstore ds.Datastore, k key.Key, p peer.ID, t time.Time) error {
func writeProviderEntry(dstore ds.Datastore, k *cid.Cid, p peer.ID, t time.Time) error {
dsk := mkProvKey(k).ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
buf := make([]byte, 16)
......@@ -182,7 +182,7 @@ func writeProviderEntry(dstore ds.Datastore, k key.Key, p peer.ID, t time.Time)
return dstore.Put(dsk, buf[:n])
}
func (pm *ProviderManager) deleteProvSet(k key.Key) error {
func (pm *ProviderManager) deleteProvSet(k *cid.Cid) error {
pm.providers.Remove(k)
res, err := pm.dstore.Query(dsq.Query{
......@@ -204,7 +204,7 @@ func (pm *ProviderManager) deleteProvSet(k key.Key) error {
return nil
}
func (pm *ProviderManager) getAllProvKeys() ([]key.Key, error) {
func (pm *ProviderManager) getAllProvKeys() ([]*cid.Cid, error) {
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Prefix: providersKeyPrefix,
......@@ -219,8 +219,7 @@ func (pm *ProviderManager) getAllProvKeys() ([]key.Key, error) {
return nil, err
}
out := make([]key.Key, 0, len(entries))
seen := make(map[key.Key]struct{})
seen := cid.NewSet()
for _, e := range entries {
parts := strings.Split(e.Key, "/")
if len(parts) != 4 {
......@@ -233,14 +232,16 @@ func (pm *ProviderManager) getAllProvKeys() ([]key.Key, error) {
continue
}
k := key.Key(decoded)
if _, ok := seen[k]; !ok {
out = append(out, key.Key(decoded))
seen[k] = struct{}{}
c, err := cid.Cast(decoded)
if err != nil {
log.Warning("error casting key to cid from datastore key")
continue
}
seen.Add(c)
}
return out, nil
return seen.Keys(), nil
}
func (pm *ProviderManager) run() {
......@@ -295,7 +296,7 @@ func (pm *ProviderManager) run() {
}
}
func (pm *ProviderManager) AddProvider(ctx context.Context, k key.Key, val peer.ID) {
func (pm *ProviderManager) AddProvider(ctx context.Context, k *cid.Cid, val peer.ID) {
prov := &addProv{
k: k,
val: val,
......@@ -306,7 +307,7 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k key.Key, val peer.
}
}
func (pm *ProviderManager) GetProviders(ctx context.Context, k key.Key) []peer.ID {
func (pm *ProviderManager) GetProviders(ctx context.Context, k *cid.Cid) []peer.ID {
gp := &getProv{
k: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
......
......@@ -4,7 +4,6 @@ import (
"sync"
u "github.com/ipfs/go-ipfs-util"
key "github.com/ipfs/go-key"
peer "github.com/ipfs/go-libp2p-peer"
pset "github.com/ipfs/go-libp2p-peer/peerset"
pstore "github.com/ipfs/go-libp2p-peerstore"
......@@ -22,7 +21,7 @@ var maxQueryConcurrency = AlphaValue
type dhtQuery struct {
dht *IpfsDHT
key key.Key // the key we're querying for
key string // the key we're querying for
qfunc queryFunc // the function to execute per peer
concurrency int // the concurrency parameter
}
......@@ -36,7 +35,7 @@ type dhtQueryResult struct {
}
// constructs query
func (dht *IpfsDHT) newQuery(k key.Key, f queryFunc) *dhtQuery {
func (dht *IpfsDHT) newQuery(k string, f queryFunc) *dhtQuery {
return &dhtQuery{
key: k,
dht: dht,
......
......@@ -7,7 +7,7 @@ import (
"sync"
"time"
key "github.com/ipfs/go-key"
cid "github.com/ipfs/go-cid"
peer "github.com/ipfs/go-libp2p-peer"
pset "github.com/ipfs/go-libp2p-peer/peerset"
pstore "github.com/ipfs/go-libp2p-peerstore"
......@@ -32,7 +32,7 @@ var asyncQueryBuffer = 10
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) error {
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) error {
log.Debugf("PutValue %s", key)
sk, err := dht.getOwnPrivateKey()
if err != nil {
......@@ -83,7 +83,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) err
}
// GetValue searches for the value corresponding to given Key.
func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
func (dht *IpfsDHT) GetValue(ctx context.Context, key string) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
......@@ -142,7 +142,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
return best, nil
}
func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]routing.RecvdVal, error) {
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]routing.RecvdVal, error) {
var vals []routing.RecvdVal
var valslock sync.Mutex
......@@ -166,7 +166,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]ro
// get closest peers in the routing table
rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
log.Debugf("peers in rt: %s", len(rtp), rtp)
log.Debugf("peers in rt: %d %s", len(rtp), rtp)
if len(rtp) == 0 {
log.Warning("No peers from routing table!")
return nil, kb.ErrLookupFailure
......@@ -240,13 +240,13 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]ro
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
defer log.EventBegin(ctx, "provide", &key).Done()
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid) error {
defer log.EventBegin(ctx, "provide", key).Done()
// add self locally
dht.providers.AddProvider(ctx, key, dht.self)
peers, err := dht.GetClosestPeers(ctx, key)
peers, err := dht.GetClosestPeers(ctx, key.KeyString())
if err != nil {
return err
}
......@@ -271,7 +271,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
wg.Wait()
return nil
}
func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) {
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
pi := pstore.PeerInfo{
ID: dht.self,
Addrs: dht.host.Addrs(),
......@@ -283,15 +283,15 @@ func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) {
return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
}
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(skey), 0)
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.KeyString(), 0)
pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
return pmes, nil
}
// FindProviders searches until the context expires.
func (dht *IpfsDHT) FindProviders(ctx context.Context, key key.Key) ([]pstore.PeerInfo, error) {
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
var providers []pstore.PeerInfo
for p := range dht.FindProvidersAsync(ctx, key, KValue) {
for p := range dht.FindProvidersAsync(ctx, c, KValue) {
providers = append(providers, p)
}
return providers, nil
......@@ -300,15 +300,15 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key key.Key) ([]pstore.Pe
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key key.Key, count int) <-chan pstore.PeerInfo {
log.Event(ctx, "findProviders", &key)
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
log.Event(ctx, "findProviders", key)
peerOut := make(chan pstore.PeerInfo, count)
go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
return peerOut
}
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key key.Key, count int, peerOut chan pstore.PeerInfo) {
defer log.EventBegin(ctx, "findProvidersAsync", &key).Done()
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
defer close(peerOut)
ps := pset.NewLimited(count)
......@@ -331,7 +331,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key key.Key,
// setup the Query
parent := ctx
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
......@@ -376,7 +376,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key key.Key,
return &dhtQueryResult{closerPeers: clpeers}, nil
})
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), KValue)
_, err := query.Run(ctx, peers)
if err != nil {
log.Debugf("Query error: %s", err)
......@@ -421,7 +421,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo,
// setup the Query
parent := ctx
query := dht.newQuery(key.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
......@@ -479,7 +479,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
}
// setup the Query
query := dht.newQuery(key.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment