Commit 33985c53 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

switch DHT entries over to be records, test currently fail

parent e290b54d
......@@ -49,7 +49,7 @@ func (p *ipnsPublisher) Publish(k ci.PrivKey, value string) error {
nameb := u.Hash(pkbytes)
namekey := u.Key(nameb).Pretty()
ipnskey := u.Hash([]byte("/ipns/" + namekey))
ipnskey := []byte("/ipns/" + namekey)
// Store associated public key
timectx, _ := context.WithDeadline(ctx, time.Now().Add(time.Second*4))
......@@ -58,7 +58,7 @@ func (p *ipnsPublisher) Publish(k ci.PrivKey, value string) error {
return err
}
// Store ipns entry at h("/ipns/"+b58(h(pubkey)))
// Store ipns entry at "/ipns/"+b58(h(pubkey))
timectx, _ = context.WithDeadline(ctx, time.Now().Add(time.Second*4))
err = p.routing.PutValue(timectx, u.Key(ipnskey), data)
if err != nil {
......
......@@ -46,7 +46,7 @@ func (r *routingResolver) Resolve(name string) (string, error) {
// use the routing system to get the name.
// /ipns/<name>
h := u.Hash([]byte("/ipns/" + name))
h := []byte("/ipns/" + name)
ipnsKey := u.Key(h)
val, err := r.routing.GetValue(ctx, ipnsKey)
......
......@@ -60,6 +60,9 @@ type IpfsDHT struct {
//lock to make diagnostics work better
diaglock sync.Mutex
// record validator funcs
Validators map[string]ValidatorFunc
ctxc.ContextCloser
}
......@@ -81,6 +84,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
dht.birth = time.Now()
dht.Validators = make(map[string]ValidatorFunc)
if doPinging {
dht.Children().Add(1)
......@@ -215,16 +219,16 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa
// putValueToNetwork stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
key string, value []byte) error {
key string, rec *pb.Record) error {
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes.Value = value
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
return err
}
if !bytes.Equal(rpmes.Value, pmes.Value) {
if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
return errors.New("value not put correctly")
}
return nil
......@@ -260,11 +264,16 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
return nil, nil, err
}
log.Debugf("pmes.GetValue() %v", pmes.GetValue())
if value := pmes.GetValue(); value != nil {
if record := pmes.GetRecord(); record != nil {
// Success! We were given the value
log.Debug("getValueOrPeers: got value")
return value, nil, nil
// make sure record is still valid
err = dht.verifyRecord(record)
if err != nil {
return nil, nil, err
}
return record.GetValue(), nil, nil
}
// TODO decide on providers. This probably shouldn't be happening.
......@@ -325,10 +334,15 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
continue
}
if value := pmes.GetValue(); value != nil {
if record := pmes.GetRecord(); record != nil {
// Success! We were given the value
err := dht.verifyRecord(record)
if err != nil {
return nil, err
}
dht.providers.AddProvider(key, p)
return value, nil
return record.GetValue(), nil
}
}
return nil, routing.ErrNotFound
......@@ -347,12 +361,35 @@ func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
if !ok {
return nil, errors.New("value stored in datastore not []byte")
}
return byt, nil
rec := new(pb.Record)
err = proto.Unmarshal(byt, rec)
if err != nil {
return nil, err
}
// TODO: 'if paranoid'
if u.Debug {
err = dht.verifyRecord(rec)
if err != nil {
return nil, err
}
}
return rec.GetValue(), nil
}
// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
return dht.datastore.Put(key.DsKey(), value)
rec, err := dht.makePutRecord(key, value)
if err != nil {
return err
}
data, err := proto.Marshal(rec)
if err != nil {
return err
}
return dht.datastore.Put(key.DsKey(), data)
}
// Update signals to all routingTables to Update their last-seen status
......
......@@ -124,10 +124,10 @@ func TestGetFailures(t *testing.T) {
fs := &fauxSender{}
peerstore := peer.NewPeerstore()
local := peer.WithIDString("test_peer")
local := makePeer(nil)
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
other := peer.WithIDString("other_peer")
other := makePeer(nil)
d.Update(other)
// This one should time out
......@@ -173,10 +173,14 @@ func TestGetFailures(t *testing.T) {
// Now we test this DHT's handleGetValue failure
typ := pb.Message_GET_VALUE
str := "hello"
rec, err := d.makePutRecord(u.Key(str), []byte("blah"))
if err != nil {
t.Fatal(err)
}
req := pb.Message{
Type: &typ,
Key: &str,
Value: []byte{0},
Type: &typ,
Key: &str,
Record: rec,
}
// u.POut("handleGetValue Test\n")
......@@ -192,10 +196,10 @@ func TestGetFailures(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if pmes.GetValue() != nil {
if pmes.GetRecord() != nil {
t.Fatal("shouldnt have value")
}
if pmes.GetCloserPeers() != nil {
if len(pmes.GetCloserPeers()) > 0 {
t.Fatal("shouldnt have closer peers")
}
if pmes.GetProviderPeers() != nil {
......@@ -221,7 +225,7 @@ func TestNotFound(t *testing.T) {
fn := &fauxNet{}
fs := &fauxSender{}
local := peer.WithIDString("test_peer")
local := makePeer(nil)
peerstore := peer.NewPeerstore()
peerstore.Add(local)
......@@ -287,7 +291,7 @@ func TestLessThanKResponses(t *testing.T) {
u.Debug = false
fn := &fauxNet{}
fs := &fauxSender{}
local := peer.WithIDString("test_peer")
local := makePeer(nil)
peerstore := peer.NewPeerstore()
peerstore.Add(local)
......
......@@ -5,6 +5,8 @@ import (
"fmt"
"time"
"code.google.com/p/goprotobuf/proto"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"
......@@ -72,7 +74,14 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message,
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}
resp.Value = byts
rec := new(pb.Record)
err := proto.Unmarshal(byts, rec)
if err != nil {
log.Error("Failed to unmarshal dht record from datastore")
return nil, err
}
resp.Record = rec
}
// if we know any providers for the requested value, return those.
......@@ -102,8 +111,20 @@ func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message,
dht.dslock.Lock()
defer dht.dslock.Unlock()
dskey := u.Key(pmes.GetKey()).DsKey()
err := dht.datastore.Put(dskey, pmes.GetValue())
log.Debugf("%s handlePutValue %v %v\n", dht.self, dskey, pmes.GetValue())
err := dht.verifyRecord(pmes.GetRecord())
if err != nil {
log.Error("Bad dht record in put request")
return nil, err
}
data, err := proto.Marshal(pmes.GetRecord())
if err != nil {
return nil, err
}
err = dht.datastore.Put(dskey, data)
log.Debugf("%s handlePutValue %v\n", dht.self, dskey)
return pmes, err
}
......
......@@ -10,10 +10,11 @@ It is generated from these files:
It has these top-level messages:
Message
Record
*/
package dht_pb
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import proto "code.google.com/p/gogoprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
......@@ -75,7 +76,7 @@ type Message struct {
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
// Used to return a value
// PUT_VALUE, GET_VALUE
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Record *Record `protobuf:"bytes,3,opt,name=record" json:"record,omitempty"`
// Used to return peers closer to a key in a query
// GET_VALUE, GET_PROVIDERS, FIND_NODE
CloserPeers []*Message_Peer `protobuf:"bytes,8,rep,name=closerPeers" json:"closerPeers,omitempty"`
......@@ -110,9 +111,9 @@ func (m *Message) GetKey() string {
return ""
}
func (m *Message) GetValue() []byte {
func (m *Message) GetRecord() *Record {
if m != nil {
return m.Value
return m.Record
}
return nil
}
......@@ -155,6 +156,52 @@ func (m *Message_Peer) GetAddr() string {
return ""
}
// Record represents a dht record that contains a value
// for a key value pair
type Record struct {
// The key that references this record
Key *string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"`
// The actual value this record is storing
Value []byte `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
// hash of the authors public key
Author *string `protobuf:"bytes,3,opt,name=author" json:"author,omitempty"`
// A PKI signature for the key+value+author
Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Record) Reset() { *m = Record{} }
func (m *Record) String() string { return proto.CompactTextString(m) }
func (*Record) ProtoMessage() {}
func (m *Record) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
}
return ""
}
func (m *Record) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *Record) GetAuthor() string {
if m != nil && m.Author != nil {
return *m.Author
}
return ""
}
func (m *Record) GetSignature() []byte {
if m != nil {
return m.Signature
}
return nil
}
func init() {
proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value)
}
......@@ -29,7 +29,7 @@ message Message {
// Used to return a value
// PUT_VALUE, GET_VALUE
optional bytes value = 3;
optional Record record = 3;
// Used to return peers closer to a key in a query
// GET_VALUE, GET_PROVIDERS, FIND_NODE
......@@ -39,3 +39,19 @@ message Message {
// GET_VALUE, ADD_PROVIDER, GET_PROVIDERS
repeated Peer providerPeers = 9;
}
// Record represents a dht record that contains a value
// for a key value pair
message Record {
// The key that references this record
optional string key = 1;
// The actual value this record is storing
optional bytes value = 2;
// hash of the authors public key
optional string author = 3;
// A PKI signature for the key+value+author
optional bytes signature = 4;
}
package dht
import (
"bytes"
"errors"
"strings"
"code.google.com/p/goprotobuf/proto"
"github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"
)
type ValidatorFunc func(u.Key, []byte) error
var ErrBadRecord = errors.New("bad dht record")
var ErrInvalidRecordType = errors.New("invalid record keytype")
// creates and signs a dht record for the given key/value pair
func (dht *IpfsDHT) makePutRecord(key u.Key, value []byte) (*pb.Record, error) {
record := new(pb.Record)
record.Key = proto.String(key.String())
record.Value = value
record.Author = proto.String(string(dht.self.ID()))
blob := bytes.Join([][]byte{[]byte(key), value, []byte(dht.self.ID())}, []byte{})
sig, err := dht.self.PrivKey().Sign(blob)
if err != nil {
return nil, err
}
record.Signature = sig
return record, nil
}
func (dht *IpfsDHT) verifyRecord(r *pb.Record) error {
// First, validate the signature
p, err := dht.peerstore.Get(peer.ID(r.GetAuthor()))
if err != nil {
return err
}
blob := bytes.Join([][]byte{[]byte(r.GetKey()),
r.GetValue(),
[]byte(r.GetKey())}, []byte{})
ok, err := p.PubKey().Verify(blob, r.GetSignature())
if err != nil {
return err
}
if !ok {
return ErrBadRecord
}
// Now, check validity func
parts := strings.Split(r.GetKey(), "/")
if len(parts) < 2 {
log.Error("Record had bad key: %s", r.GetKey())
return ErrBadRecord
}
fnc, ok := dht.Validators[parts[0]]
if !ok {
log.Errorf("Unrecognized key prefix: %s", parts[0])
return ErrInvalidRecordType
}
return fnc(u.Key(r.GetKey()), r.GetValue())
}
......@@ -25,6 +25,12 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
return err
}
rec, err := dht.makePutRecord(key, value)
if err != nil {
log.Error("Creation of record failed!")
return err
}
var peers []peer.Peer
for _, route := range dht.routingTables {
npeers := route.NearestPeers(kb.ConvertKey(key), KValue)
......@@ -33,7 +39,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
log.Debugf("%s PutValue qry part %v", dht.self, p)
err := dht.putValueToNetwork(ctx, p, string(key), value)
err := dht.putValueToNetwork(ctx, p, string(key), rec)
if err != nil {
return nil, err
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment