Commit 4d76fd28 authored by Steven Allen's avatar Steven Allen

upgrade protobuf and switch to bytes keys

fixes #177
parent 15cc53bd
......@@ -143,14 +143,13 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
}
// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
key string, rec *recpb.Record) error {
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
pmes := pb.NewMessage(pb.Message_PUT_VALUE, key, 0)
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
log.Debugf("putValueToPeer: %v. (peer: %s, key: %s)", err, p.Pretty(), key)
log.Debugf("putValueToPeer: %v. (peer: %s, key: %s)", err, p.Pretty(), loggableKey(string(rec.Key)))
return err
}
......@@ -183,7 +182,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string)
log.Debug("getValueOrPeers: got value")
// make sure record is valid.
err = dht.Validator.Validate(record.GetKey(), record.GetValue())
err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
if err != nil {
log.Info("Received invalid record! (discarded)")
// return a sentinal to signify an invalid record was received
......@@ -212,7 +211,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (
eip := log.EventBegin(ctx, "getValueSingle", meta)
defer eip.Done()
pmes := pb.NewMessage(pb.Message_GET_VALUE, key, 0)
pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
......@@ -236,7 +235,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
}
// Double check the key. Can't hurt.
if rec != nil && rec.GetKey() != key {
if rec != nil && string(rec.GetKey()) != key {
log.Errorf("BUG getLocal: found a DHT record that didn't match it's key: %s != %s", rec.GetKey(), key)
return nil, nil
......@@ -293,7 +292,7 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
})
defer eip.Done()
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
......@@ -311,7 +310,7 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid
eip := log.EventBegin(ctx, "findProvidersSingle", p, key)
defer eip.Done()
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.KeyString(), 0)
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.Bytes(), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
......@@ -327,7 +326,7 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid
// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
closer := dht.routingTable.NearestPeers(kb.ConvertKey(pmes.GetKey()), count)
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
return closer
}
......
......@@ -105,8 +105,8 @@ func TestGetFailures(t *testing.T) {
rec := record.MakePutRecord(str, []byte("blah"))
req := pb.Message{
Type: &typ,
Key: &str,
Type: typ,
Key: []byte(str),
Record: rec,
}
......
package dht
import (
"bytes"
"context"
"errors"
"fmt"
......@@ -59,7 +60,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
// first, is there even a key?
k := pmes.GetKey()
if k == "" {
if len(k) == 0 {
return nil, errors.New("handleGetValue but no key was provided")
// TODO: send back an error response? could be bad, but the other node's hanging.
}
......@@ -90,7 +91,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return resp, nil
}
func (dht *IpfsDHT) checkLocalDatastore(k string) (*recpb.Record, error) {
func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
log.Debugf("%s handleGetValue looking into ds", dht.self)
dskey := convertToDsKey(k)
iVal, err := dht.datastore.Get(dskey)
......@@ -150,8 +151,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k string) (*recpb.Record, error) {
// Cleans the record (to avoid storing arbitrary data).
func cleanRecord(rec *recpb.Record) {
rec.XXX_unrecognized = nil
rec.TimeReceived = nil
rec.TimeReceived = ""
}
// Store a value in this peer local storage
......@@ -170,14 +170,14 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return nil, errors.New("nil record")
}
if pmes.GetKey() != rec.GetKey() {
if !bytes.Equal(pmes.GetKey(), rec.GetKey()) {
return nil, errors.New("put key doesn't match record key")
}
cleanRecord(rec)
// Make sure the record is valid (not expired, valid signature etc)
if err = dht.Validator.Validate(rec.GetKey(), rec.GetValue()); err != nil {
if err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue()); err != nil {
log.Warningf("Bad dht record in PUT from: %s. %s", p.Pretty(), err)
return nil, err
}
......@@ -194,7 +194,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
if existing != nil {
recs := [][]byte{rec.GetValue(), existing.GetValue()}
i, err := dht.Validator.Select(rec.GetKey(), recs)
i, err := dht.Validator.Select(string(rec.GetKey()), recs)
if err != nil {
log.Warningf("Bad dht record in PUT from %s: %s", p.Pretty(), err)
return nil, err
......@@ -206,7 +206,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
}
// record the time we receive every record
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
rec.TimeReceived = u.FormatRFC3339(time.Now())
data, err := proto.Marshal(rec)
if err != nil {
......@@ -245,7 +245,7 @@ func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error)
return nil, nil
}
err = dht.Validator.Validate(rec.GetKey(), rec.GetValue())
err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue())
if err != nil {
// Invalid record in datastore, probably expired but don't return an error,
// we'll just overwrite it
......@@ -263,7 +263,7 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handleFindPeer", p).Done()
resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
resp := pb.NewMessage(pmes.GetType(), nil, pmes.GetClusterLevel())
var closest []peer.ID
// if looking for self... special case where we send it on CloserPeers.
......@@ -331,7 +331,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
defer log.Debugf("%s end", reqDesc)
// check if we have this value, to add ourselves as provider.
has, err := dht.datastore.Has(convertToDsKey(c.KeyString()))
has, err := dht.datastore.Has(convertToDsKey(c.Bytes()))
if err != nil && err != ds.ErrNotFound {
log.Debugf("unexpected datastore error: %v\n", err)
has = false
......@@ -403,6 +403,6 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
return nil, nil
}
func convertToDsKey(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
func convertToDsKey(s []byte) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString(s))
}
......@@ -10,10 +10,9 @@ import (
func TestCleanRecordSigned(t *testing.T) {
actual := new(recpb.Record)
actual.TimeReceived = proto.String("time")
actual.XXX_unrecognized = []byte("extra data")
actual.TimeReceived = "time"
actual.Value = []byte("value")
actual.Key = proto.String("key")
actual.Key = []byte("key")
cleanRecord(actual)
actualBytes, err := proto.Marshal(actual)
......@@ -23,7 +22,7 @@ func TestCleanRecordSigned(t *testing.T) {
expected := new(recpb.Record)
expected.Value = []byte("value")
expected.Key = proto.String("key")
expected.Key = []byte("key")
expectedBytes, err := proto.Marshal(expected)
if err != nil {
t.Fatal(err)
......@@ -36,9 +35,8 @@ func TestCleanRecordSigned(t *testing.T) {
func TestCleanRecord(t *testing.T) {
actual := new(recpb.Record)
actual.TimeReceived = proto.String("time")
actual.XXX_unrecognized = []byte("extra data")
actual.Key = proto.String("key")
actual.TimeReceived = "time"
actual.Key = []byte("key")
actual.Value = []byte("value")
cleanRecord(actual)
......@@ -48,7 +46,7 @@ func TestCleanRecord(t *testing.T) {
}
expected := new(recpb.Record)
expected.Key = proto.String("key")
expected.Key = []byte("key")
expected.Value = []byte("value")
expectedBytes, err := proto.Marshal(expected)
if err != nil {
......
......@@ -30,7 +30,7 @@
"version": "1.0.0"
},
{
"hash": "QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV",
"hash": "QmdxUuburamoF6zF9qjeQC4WYcWGbWuRmdLacMEsW8ioD8",
"name": "gogo-protobuf",
"version": "0.0.0"
},
......@@ -72,9 +72,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmVsp2KdPYE6M8ryzCk5KHLo3zprcY5hBDaYx6uPCFUdxA",
"hash": "QmUKGC4P3FT4y3ThT6sesshDt4HQofKdee3C9oJknQ4s6p",
"name": "go-libp2p-record",
"version": "4.1.3"
"version": "4.1.4"
},
{
"author": "whyrusleeping",
......
......@@ -4,7 +4,7 @@ GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $<
protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $<
clean:
rm -f *.pb.go
......
This diff is collapsed.
......@@ -5,10 +5,10 @@
// Now from `libp2p/go-libp2p-kad-dht/pb` you can run...
// `protoc --gogo_out=. --proto_path=../../go-libp2p-record/pb/ --proto_path=./ dht.proto`
syntax = "proto2";
syntax = "proto3";
package dht.pb;
import "record.proto";
import "github.com/libp2p/go-libp2p-record/pb/record.proto";
message Message {
enum MessageType {
......@@ -37,29 +37,29 @@ message Message {
message Peer {
// ID of a given peer.
optional string id = 1;
bytes id = 1;
// multiaddrs for a given peer
repeated bytes addrs = 2;
// used to signal the sender's connection capabilities to the peer
optional ConnectionType connection = 3;
ConnectionType connection = 3;
}
// defines what type of message it is.
optional MessageType type = 1;
MessageType type = 1;
// defines what coral cluster level this query/response belongs to.
// in case we want to implement coral's cluster rings in the future.
optional int32 clusterLevelRaw = 10;
int32 clusterLevelRaw = 10;
// Used to specify the key associated with this message.
// PUT_VALUE, GET_VALUE, ADD_PROVIDER, GET_PROVIDERS
optional string key = 2;
bytes key = 2;
// Used to return a value
// PUT_VALUE, GET_VALUE
optional record.pb.Record record = 3;
record.pb.Record record = 3;
// Used to return peers closer to a key in a query
// GET_VALUE, GET_PROVIDERS, FIND_NODE
......
......@@ -17,10 +17,10 @@ type PeerRoutingInfo struct {
}
// NewMessage constructs a new dht message with given type, key, and level
func NewMessage(typ Message_MessageType, key string, level int) *Message {
func NewMessage(typ Message_MessageType, key []byte, level int) *Message {
m := &Message{
Type: &typ,
Key: &key,
Type: typ,
Key: key,
}
m.SetClusterLevel(level)
return m
......@@ -34,9 +34,9 @@ func peerRoutingInfoToPBPeer(p PeerRoutingInfo) *Message_Peer {
pbp.Addrs[i] = maddr.Bytes() // Bytes, not String. Compressed.
}
s := string(p.ID)
pbp.Id = &s
pbp.Id = []byte(s)
c := ConnectionType(p.Connectedness)
pbp.Connection = &c
pbp.Connection = c
return pbp
}
......@@ -47,8 +47,7 @@ func peerInfoToPBPeer(p pstore.PeerInfo) *Message_Peer {
for i, maddr := range p.Addrs {
pbp.Addrs[i] = maddr.Bytes() // Bytes, not String. Compressed.
}
s := string(p.ID)
pbp.Id = &s
pbp.Id = []byte(p.ID)
return pbp
}
......@@ -78,7 +77,7 @@ func PeerInfosToPBPeers(n inet.Network, peers []pstore.PeerInfo) []*Message_Peer
pbps := RawPeerInfosToPBPeers(peers)
for i, pbp := range pbps {
c := ConnectionType(n.Connectedness(peers[i].ID))
pbp.Connection = &c
pbp.Connection = c
}
return pbps
}
......@@ -136,7 +135,7 @@ func (m *Message) GetClusterLevel() int {
// default "no value" protobuf behavior (0)
func (m *Message) SetClusterLevel(level int) {
lvl := int32(level)
m.ClusterLevelRaw = &lvl
m.ClusterLevelRaw = lvl
}
// Loggable turns a Message into machine-readable log output
......
......@@ -6,7 +6,6 @@ import (
"testing"
"time"
proto "github.com/gogo/protobuf/proto"
u "github.com/ipfs/go-ipfs-util"
ci "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
......@@ -218,7 +217,7 @@ func TestPubkeyBadKeyFromDHT(t *testing.T) {
// Store incorrect public key on node B
rec := record.MakePutRecord(pkkey, wrongbytes)
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
rec.TimeReceived = u.FormatRFC3339(time.Now())
err = dhtB.putLocal(pkkey, rec)
if err != nil {
t.Fatal(err)
......@@ -261,7 +260,7 @@ func TestPubkeyBadKeyFromDHTGoodKeyDirect(t *testing.T) {
// Store incorrect public key on node B
rec := record.MakePutRecord(pkkey, wrongbytes)
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
rec.TimeReceived = u.FormatRFC3339(time.Now())
err = dhtB.putLocal(pkkey, rec)
if err != nil {
t.Fatal(err)
......
......@@ -8,7 +8,6 @@ import (
"sync"
"time"
proto "github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
......@@ -71,7 +70,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
}
rec := record.MakePutRecord(key, value)
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
rec.TimeReceived = u.FormatRFC3339(time.Now())
err = dht.putLocal(key, rec)
if err != nil {
return err
......@@ -94,7 +93,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
ID: p,
})
err := dht.putValueToPeer(ctx, p, key, rec)
err := dht.putValueToPeer(ctx, p, rec)
if err != nil {
log.Debugf("failed putting value to peer: %s", err)
}
......@@ -172,7 +171,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Opti
}
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
defer cancel()
err := dht.putValueToPeer(ctx, v.From, key, fixupRec)
err := dht.putValueToPeer(ctx, v.From, fixupRec)
if err != nil {
log.Debug("Error correcting DHT entry: ", err)
}
......@@ -349,7 +348,7 @@ func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
}
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.KeyString(), 0)
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0)
pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
return pmes, nil
}
......@@ -580,7 +579,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
peersSeenMx.Unlock()
// if peer is connected, send it to our client.
if pb.Connectedness(*pbp.Connection) == inet.Connected {
if pb.Connectedness(pbp.Connection) == inet.Connected {
select {
case <-ctx.Done():
return nil, ctx.Err()
......@@ -590,7 +589,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
// if peer is the peer we're looking for, don't bother querying it.
// TODO maybe query it?
if pb.Connectedness(*pbp.Connection) != inet.Connected {
if pb.Connectedness(pbp.Connection) != inet.Connected {
clpeers = append(clpeers, pi)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment