Commit c6ef238a authored by Jeromy's avatar Jeromy

fix publish fail on prexisting bad record

dont error out if prexisting record is bad, just grab its sequence number
and continue on with the publish.

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 9fb7bdff
...@@ -3,6 +3,7 @@ package dht ...@@ -3,6 +3,7 @@ package dht
import ( import (
"errors" "errors"
"fmt" "fmt"
"time"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
...@@ -10,6 +11,7 @@ import ( ...@@ -10,6 +11,7 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
pb "github.com/ipfs/go-ipfs/routing/dht/pb" pb "github.com/ipfs/go-ipfs/routing/dht/pb"
u "github.com/ipfs/go-ipfs/util"
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
) )
...@@ -46,41 +48,17 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -46,41 +48,17 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// first, is there even a key? // first, is there even a key?
k := pmes.GetKey() k := key.Key(pmes.GetKey())
if k == "" { if k == "" {
return nil, errors.New("handleGetValue but no key was provided") return nil, errors.New("handleGetValue but no key was provided")
// TODO: send back an error response? could be bad, but the other node's hanging. // TODO: send back an error response? could be bad, but the other node's hanging.
} }
// let's first check if we have the value locally. rec, err := dht.checkLocalDatastore(k)
log.Debugf("%s handleGetValue looking into ds", dht.self) if err != nil {
dskey := key.Key(k).DsKey()
iVal, err := dht.datastore.Get(dskey)
log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)
// if we got an unexpected error, bail.
if err != nil && err != ds.ErrNotFound {
return nil, err return nil, err
} }
resp.Record = rec
// if we have the value, send it back
if err == nil {
log.Debugf("%s handleGetValue success!", dht.self)
byts, ok := iVal.([]byte)
if !ok {
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}
rec := new(pb.Record)
err := proto.Unmarshal(byts, rec)
if err != nil {
log.Debug("Failed to unmarshal dht record from datastore")
return nil, err
}
resp.Record = rec
}
// Find closest peer on given cluster to desired key and reply with that info // Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
...@@ -102,6 +80,69 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -102,6 +80,69 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return resp, nil return resp, nil
} }
func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*pb.Record, error) {
log.Debugf("%s handleGetValue looking into ds", dht.self)
dskey := k.DsKey()
iVal, err := dht.datastore.Get(dskey)
log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)
if err == ds.ErrNotFound {
return nil, nil
}
// if we got an unexpected error, bail.
if err != nil {
return nil, err
}
// if we have the value, send it back
log.Debugf("%s handleGetValue success!", dht.self)
byts, ok := iVal.([]byte)
if !ok {
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}
rec := new(pb.Record)
err = proto.Unmarshal(byts, rec)
if err != nil {
log.Debug("Failed to unmarshal dht record from datastore")
return nil, err
}
// if its our record, dont bother checking the times on it
if peer.ID(rec.GetAuthor()) == dht.self {
return rec, nil
}
var recordIsBad bool
recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
if err != nil {
log.Info("either no receive time set on record, or it was invalid: ", err)
recordIsBad = true
}
if time.Now().Sub(recvtime) > MaxRecordAge {
log.Debug("old record found, tossing.")
recordIsBad = true
}
// NOTE: we do not verify the record here beyond checking these timestamps.
// we put the burden of checking the records on the requester as checking a record
// may be computationally expensive
if recordIsBad {
err := dht.datastore.Delete(dskey)
if err != nil {
log.Error("Failed to delete bad record from datastore: ", err)
}
return nil, nil // can treat this as not having the record at all
}
return rec, nil
}
// Store a value in this peer local storage // Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handlePutValue", p).Done() defer log.EventBegin(ctx, "handlePutValue", p).Done()
...@@ -112,7 +153,12 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -112,7 +153,12 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return nil, err return nil, err
} }
data, err := proto.Marshal(pmes.GetRecord()) rec := pmes.GetRecord()
// record the time we receive every record
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
data, err := proto.Marshal(rec)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -14,7 +14,7 @@ It has these top-level messages: ...@@ -14,7 +14,7 @@ It has these top-level messages:
*/ */
package dht_pb package dht_pb
import proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" import proto "github.com/gogo/protobuf/proto"
import math "math" import math "math"
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
...@@ -221,8 +221,10 @@ type Record struct { ...@@ -221,8 +221,10 @@ type Record struct {
// hash of the authors public key // hash of the authors public key
Author *string `protobuf:"bytes,3,opt,name=author" json:"author,omitempty"` Author *string `protobuf:"bytes,3,opt,name=author" json:"author,omitempty"`
// A PKI signature for the key+value+author // A PKI signature for the key+value+author
Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"` Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"`
XXX_unrecognized []byte `json:"-"` // Time the record was received, set by receiver
TimeReceived *string `protobuf:"bytes,5,opt,name=timeReceived" json:"timeReceived,omitempty"`
XXX_unrecognized []byte `json:"-"`
} }
func (m *Record) Reset() { *m = Record{} } func (m *Record) Reset() { *m = Record{} }
...@@ -257,6 +259,13 @@ func (m *Record) GetSignature() []byte { ...@@ -257,6 +259,13 @@ func (m *Record) GetSignature() []byte {
return nil return nil
} }
func (m *Record) GetTimeReceived() string {
if m != nil && m.TimeReceived != nil {
return *m.TimeReceived
}
return ""
}
func init() { func init() {
proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value) proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value)
proto.RegisterEnum("dht.pb.Message_ConnectionType", Message_ConnectionType_name, Message_ConnectionType_value) proto.RegisterEnum("dht.pb.Message_ConnectionType", Message_ConnectionType_name, Message_ConnectionType_value)
......
...@@ -75,4 +75,7 @@ message Record { ...@@ -75,4 +75,7 @@ message Record {
// A PKI signature for the key+value+author // A PKI signature for the key+value+author
optional bytes signature = 4; optional bytes signature = 4;
// Time the record was received, set by receiver
optional string timeReceived = 5;
} }
...@@ -2,6 +2,7 @@ package dht ...@@ -2,6 +2,7 @@ package dht
import ( import (
"fmt" "fmt"
"time"
ctxfrac "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/frac" ctxfrac "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/frac"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
...@@ -12,6 +13,14 @@ import ( ...@@ -12,6 +13,14 @@ import (
record "github.com/ipfs/go-ipfs/routing/record" record "github.com/ipfs/go-ipfs/routing/record"
) )
// MaxRecordAge specifies the maximum time that any node will hold onto a record
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
const MaxRecordAge = time.Hour * 36
func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
log.Debugf("getPublicKey for: %s", p) log.Debugf("getPublicKey for: %s", p)
......
...@@ -4,12 +4,15 @@ import ( ...@@ -4,12 +4,15 @@ import (
"errors" "errors"
"time" "time"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
routing "github.com/ipfs/go-ipfs/routing" routing "github.com/ipfs/go-ipfs/routing"
dhtpb "github.com/ipfs/go-ipfs/routing/dht/pb"
u "github.com/ipfs/go-ipfs/util"
"github.com/ipfs/go-ipfs/util/testutil" "github.com/ipfs/go-ipfs/util/testutil"
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0" logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
) )
...@@ -25,7 +28,16 @@ type client struct { ...@@ -25,7 +28,16 @@ type client struct {
// FIXME(brian): is this method meant to simulate putting a value into the network? // FIXME(brian): is this method meant to simulate putting a value into the network?
func (c *client) PutValue(ctx context.Context, key key.Key, val []byte) error { func (c *client) PutValue(ctx context.Context, key key.Key, val []byte) error {
log.Debugf("PutValue: %s", key) log.Debugf("PutValue: %s", key)
return c.datastore.Put(key.DsKey(), val) rec := new(dhtpb.Record)
rec.Value = val
rec.Key = proto.String(string(key))
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
data, err := proto.Marshal(rec)
if err != nil {
return err
}
return c.datastore.Put(key.DsKey(), data)
} }
// FIXME(brian): is this method meant to simulate getting a value from the network? // FIXME(brian): is this method meant to simulate getting a value from the network?
...@@ -41,21 +53,22 @@ func (c *client) GetValue(ctx context.Context, key key.Key) ([]byte, error) { ...@@ -41,21 +53,22 @@ func (c *client) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
return nil, errors.New("could not cast value from datastore") return nil, errors.New("could not cast value from datastore")
} }
return data, nil rec := new(dhtpb.Record)
err = proto.Unmarshal(data, rec)
if err != nil {
return nil, err
}
return rec.GetValue(), nil
} }
func (c *client) GetValues(ctx context.Context, key key.Key, count int) ([]routing.RecvdVal, error) { func (c *client) GetValues(ctx context.Context, key key.Key, count int) ([]routing.RecvdVal, error) {
log.Debugf("GetValue: %s", key) log.Debugf("GetValues: %s", key)
v, err := c.datastore.Get(key.DsKey()) data, err := c.GetValue(ctx, key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
data, ok := v.([]byte)
if !ok {
return nil, errors.New("could not cast value from datastore")
}
return []routing.RecvdVal{{Val: data, From: c.peer.ID()}}, nil return []routing.RecvdVal{{Val: data, From: c.peer.ID()}}, nil
} }
......
...@@ -80,7 +80,7 @@ func (rs *s) Client(p testutil.Identity) Client { ...@@ -80,7 +80,7 @@ func (rs *s) Client(p testutil.Identity) Client {
func (rs *s) ClientWithDatastore(_ context.Context, p testutil.Identity, datastore ds.Datastore) Client { func (rs *s) ClientWithDatastore(_ context.Context, p testutil.Identity, datastore ds.Datastore) Client {
return &client{ return &client{
peer: p, peer: p,
datastore: ds.NewMapDatastore(), datastore: datastore,
server: rs, server: rs,
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment