From 33985c530ed4bd8ccd4b93997cd67da58025ea20 Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Sun, 9 Nov 2014 23:45:16 -0800 Subject: [PATCH] switch DHT entries over to be records, test currently fail --- namesys/publisher.go | 4 +-- namesys/routing.go | 2 +- routing/dht/dht.go | 57 +++++++++++++++++++++++++++------ routing/dht/ext_test.go | 22 +++++++------ routing/dht/handlers.go | 27 ++++++++++++++-- routing/dht/pb/dht.pb.go | 55 +++++++++++++++++++++++++++++--- routing/dht/pb/dht.proto | 18 ++++++++++- routing/dht/records.go | 69 ++++++++++++++++++++++++++++++++++++++++ routing/dht/routing.go | 8 ++++- 9 files changed, 231 insertions(+), 31 deletions(-) create mode 100644 routing/dht/records.go diff --git a/namesys/publisher.go b/namesys/publisher.go index f7bf508b6..636e3fb49 100644 --- a/namesys/publisher.go +++ b/namesys/publisher.go @@ -49,7 +49,7 @@ func (p *ipnsPublisher) Publish(k ci.PrivKey, value string) error { nameb := u.Hash(pkbytes) namekey := u.Key(nameb).Pretty() - ipnskey := u.Hash([]byte("/ipns/" + namekey)) + ipnskey := []byte("/ipns/" + namekey) // Store associated public key timectx, _ := context.WithDeadline(ctx, time.Now().Add(time.Second*4)) @@ -58,7 +58,7 @@ func (p *ipnsPublisher) Publish(k ci.PrivKey, value string) error { return err } - // Store ipns entry at h("/ipns/"+b58(h(pubkey))) + // Store ipns entry at "/ipns/"+b58(h(pubkey)) timectx, _ = context.WithDeadline(ctx, time.Now().Add(time.Second*4)) err = p.routing.PutValue(timectx, u.Key(ipnskey), data) if err != nil { diff --git a/namesys/routing.go b/namesys/routing.go index 6259705ec..5f877bdc3 100644 --- a/namesys/routing.go +++ b/namesys/routing.go @@ -46,7 +46,7 @@ func (r *routingResolver) Resolve(name string) (string, error) { // use the routing system to get the name. // /ipns/<name> - h := u.Hash([]byte("/ipns/" + name)) + h := []byte("/ipns/" + name) ipnsKey := u.Key(h) val, err := r.routing.GetValue(ctx, ipnsKey) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 5f6184067..5d4caaa84 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -60,6 +60,9 @@ type IpfsDHT struct { //lock to make diagnostics work better diaglock sync.Mutex + // record validator funcs + Validators map[string]ValidatorFunc + ctxc.ContextCloser } @@ -81,6 +84,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000) dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour) dht.birth = time.Now() + dht.Validators = make(map[string]ValidatorFunc) if doPinging { dht.Children().Add(1) @@ -215,16 +219,16 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa // putValueToNetwork stores the given key/value pair at the peer 'p' func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer, - key string, value []byte) error { + key string, rec *pb.Record) error { pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) - pmes.Value = value + pmes.Record = rec rpmes, err := dht.sendRequest(ctx, p, pmes) if err != nil { return err } - if !bytes.Equal(rpmes.Value, pmes.Value) { + if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) { return errors.New("value not put correctly") } return nil @@ -260,11 +264,16 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, return nil, nil, err } - log.Debugf("pmes.GetValue() %v", pmes.GetValue()) - if value := pmes.GetValue(); value != nil { + if record := pmes.GetRecord(); record != nil { // Success! We were given the value log.Debug("getValueOrPeers: got value") - return value, nil, nil + + // make sure record is still valid + err = dht.verifyRecord(record) + if err != nil { + return nil, nil, err + } + return record.GetValue(), nil, nil } // TODO decide on providers. This probably shouldn't be happening. @@ -325,10 +334,15 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key, continue } - if value := pmes.GetValue(); value != nil { + if record := pmes.GetRecord(); record != nil { // Success! We were given the value + + err := dht.verifyRecord(record) + if err != nil { + return nil, err + } dht.providers.AddProvider(key, p) - return value, nil + return record.GetValue(), nil } } return nil, routing.ErrNotFound @@ -347,12 +361,35 @@ func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) { if !ok { return nil, errors.New("value stored in datastore not []byte") } - return byt, nil + rec := new(pb.Record) + err = proto.Unmarshal(byt, rec) + if err != nil { + return nil, err + } + + // TODO: 'if paranoid' + if u.Debug { + err = dht.verifyRecord(rec) + if err != nil { + return nil, err + } + } + + return rec.GetValue(), nil } // putLocal stores the key value pair in the datastore func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error { - return dht.datastore.Put(key.DsKey(), value) + rec, err := dht.makePutRecord(key, value) + if err != nil { + return err + } + data, err := proto.Marshal(rec) + if err != nil { + return err + } + + return dht.datastore.Put(key.DsKey(), data) } // Update signals to all routingTables to Update their last-seen status diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 6be939bed..dcf80e4d0 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -124,10 +124,10 @@ func TestGetFailures(t *testing.T) { fs := &fauxSender{} peerstore := peer.NewPeerstore() - local := peer.WithIDString("test_peer") + local := makePeer(nil) d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) - other := peer.WithIDString("other_peer") + other := makePeer(nil) d.Update(other) // This one should time out @@ -173,10 +173,14 @@ func TestGetFailures(t *testing.T) { // Now we test this DHT's handleGetValue failure typ := pb.Message_GET_VALUE str := "hello" + rec, err := d.makePutRecord(u.Key(str), []byte("blah")) + if err != nil { + t.Fatal(err) + } req := pb.Message{ - Type: &typ, - Key: &str, - Value: []byte{0}, + Type: &typ, + Key: &str, + Record: rec, } // u.POut("handleGetValue Test\n") @@ -192,10 +196,10 @@ func TestGetFailures(t *testing.T) { if err != nil { t.Fatal(err) } - if pmes.GetValue() != nil { + if pmes.GetRecord() != nil { t.Fatal("shouldnt have value") } - if pmes.GetCloserPeers() != nil { + if len(pmes.GetCloserPeers()) > 0 { t.Fatal("shouldnt have closer peers") } if pmes.GetProviderPeers() != nil { @@ -221,7 +225,7 @@ func TestNotFound(t *testing.T) { fn := &fauxNet{} fs := &fauxSender{} - local := peer.WithIDString("test_peer") + local := makePeer(nil) peerstore := peer.NewPeerstore() peerstore.Add(local) @@ -287,7 +291,7 @@ func TestLessThanKResponses(t *testing.T) { u.Debug = false fn := &fauxNet{} fs := &fauxSender{} - local := peer.WithIDString("test_peer") + local := makePeer(nil) peerstore := peer.NewPeerstore() peerstore.Add(local) diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index f7b074416..899f24292 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "code.google.com/p/goprotobuf/proto" + peer "github.com/jbenet/go-ipfs/peer" pb "github.com/jbenet/go-ipfs/routing/dht/pb" u "github.com/jbenet/go-ipfs/util" @@ -72,7 +74,14 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey) } - resp.Value = byts + rec := new(pb.Record) + err := proto.Unmarshal(byts, rec) + if err != nil { + log.Error("Failed to unmarshal dht record from datastore") + return nil, err + } + + resp.Record = rec } // if we know any providers for the requested value, return those. @@ -102,8 +111,20 @@ func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message, dht.dslock.Lock() defer dht.dslock.Unlock() dskey := u.Key(pmes.GetKey()).DsKey() - err := dht.datastore.Put(dskey, pmes.GetValue()) - log.Debugf("%s handlePutValue %v %v\n", dht.self, dskey, pmes.GetValue()) + + err := dht.verifyRecord(pmes.GetRecord()) + if err != nil { + log.Error("Bad dht record in put request") + return nil, err + } + + data, err := proto.Marshal(pmes.GetRecord()) + if err != nil { + return nil, err + } + + err = dht.datastore.Put(dskey, data) + log.Debugf("%s handlePutValue %v\n", dht.self, dskey) return pmes, err } diff --git a/routing/dht/pb/dht.pb.go b/routing/dht/pb/dht.pb.go index 6c488c51a..fd7620627 100644 --- a/routing/dht/pb/dht.pb.go +++ b/routing/dht/pb/dht.pb.go @@ -10,10 +10,11 @@ It is generated from these files: It has these top-level messages: Message + Record */ package dht_pb -import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" +import proto "code.google.com/p/gogoprotobuf/proto" import math "math" // Reference imports to suppress errors if they are not otherwise used. @@ -75,7 +76,7 @@ type Message struct { Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` // Used to return a value // PUT_VALUE, GET_VALUE - Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` + Record *Record `protobuf:"bytes,3,opt,name=record" json:"record,omitempty"` // Used to return peers closer to a key in a query // GET_VALUE, GET_PROVIDERS, FIND_NODE CloserPeers []*Message_Peer `protobuf:"bytes,8,rep,name=closerPeers" json:"closerPeers,omitempty"` @@ -110,9 +111,9 @@ func (m *Message) GetKey() string { return "" } -func (m *Message) GetValue() []byte { +func (m *Message) GetRecord() *Record { if m != nil { - return m.Value + return m.Record } return nil } @@ -155,6 +156,52 @@ func (m *Message_Peer) GetAddr() string { return "" } +// Record represents a dht record that contains a value +// for a key value pair +type Record struct { + // The key that references this record + Key *string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"` + // The actual value this record is storing + Value []byte `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` + // hash of the authors public key + Author *string `protobuf:"bytes,3,opt,name=author" json:"author,omitempty"` + // A PKI signature for the key+value+author + Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Record) Reset() { *m = Record{} } +func (m *Record) String() string { return proto.CompactTextString(m) } +func (*Record) ProtoMessage() {} + +func (m *Record) GetKey() string { + if m != nil && m.Key != nil { + return *m.Key + } + return "" +} + +func (m *Record) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func (m *Record) GetAuthor() string { + if m != nil && m.Author != nil { + return *m.Author + } + return "" +} + +func (m *Record) GetSignature() []byte { + if m != nil { + return m.Signature + } + return nil +} + func init() { proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value) } diff --git a/routing/dht/pb/dht.proto b/routing/dht/pb/dht.proto index e0696e685..1b49a1552 100644 --- a/routing/dht/pb/dht.proto +++ b/routing/dht/pb/dht.proto @@ -29,7 +29,7 @@ message Message { // Used to return a value // PUT_VALUE, GET_VALUE - optional bytes value = 3; + optional Record record = 3; // Used to return peers closer to a key in a query // GET_VALUE, GET_PROVIDERS, FIND_NODE @@ -39,3 +39,19 @@ message Message { // GET_VALUE, ADD_PROVIDER, GET_PROVIDERS repeated Peer providerPeers = 9; } + +// Record represents a dht record that contains a value +// for a key value pair +message Record { + // The key that references this record + optional string key = 1; + + // The actual value this record is storing + optional bytes value = 2; + + // hash of the authors public key + optional string author = 3; + + // A PKI signature for the key+value+author + optional bytes signature = 4; +} diff --git a/routing/dht/records.go b/routing/dht/records.go new file mode 100644 index 000000000..e88b18e7b --- /dev/null +++ b/routing/dht/records.go @@ -0,0 +1,69 @@ +package dht + +import ( + "bytes" + "errors" + "strings" + + "code.google.com/p/goprotobuf/proto" + "github.com/jbenet/go-ipfs/peer" + pb "github.com/jbenet/go-ipfs/routing/dht/pb" + u "github.com/jbenet/go-ipfs/util" +) + +type ValidatorFunc func(u.Key, []byte) error + +var ErrBadRecord = errors.New("bad dht record") +var ErrInvalidRecordType = errors.New("invalid record keytype") + +// creates and signs a dht record for the given key/value pair +func (dht *IpfsDHT) makePutRecord(key u.Key, value []byte) (*pb.Record, error) { + record := new(pb.Record) + + record.Key = proto.String(key.String()) + record.Value = value + record.Author = proto.String(string(dht.self.ID())) + blob := bytes.Join([][]byte{[]byte(key), value, []byte(dht.self.ID())}, []byte{}) + sig, err := dht.self.PrivKey().Sign(blob) + if err != nil { + return nil, err + } + record.Signature = sig + return record, nil +} + +func (dht *IpfsDHT) verifyRecord(r *pb.Record) error { + // First, validate the signature + p, err := dht.peerstore.Get(peer.ID(r.GetAuthor())) + if err != nil { + return err + } + + blob := bytes.Join([][]byte{[]byte(r.GetKey()), + r.GetValue(), + []byte(r.GetKey())}, []byte{}) + + ok, err := p.PubKey().Verify(blob, r.GetSignature()) + if err != nil { + return err + } + + if !ok { + return ErrBadRecord + } + + // Now, check validity func + parts := strings.Split(r.GetKey(), "/") + if len(parts) < 2 { + log.Error("Record had bad key: %s", r.GetKey()) + return ErrBadRecord + } + + fnc, ok := dht.Validators[parts[0]] + if !ok { + log.Errorf("Unrecognized key prefix: %s", parts[0]) + return ErrInvalidRecordType + } + + return fnc(u.Key(r.GetKey()), r.GetValue()) +} diff --git a/routing/dht/routing.go b/routing/dht/routing.go index e2e5d2f37..fedf281d3 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -25,6 +25,12 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error return err } + rec, err := dht.makePutRecord(key, value) + if err != nil { + log.Error("Creation of record failed!") + return err + } + var peers []peer.Peer for _, route := range dht.routingTables { npeers := route.NearestPeers(kb.ConvertKey(key), KValue) @@ -33,7 +39,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { log.Debugf("%s PutValue qry part %v", dht.self, p) - err := dht.putValueToNetwork(ctx, p, string(key), value) + err := dht.putValueToNetwork(ctx, p, string(key), rec) if err != nil { return nil, err } -- GitLab