Commit 7e691469 authored by Juan Benet's avatar Juan Benet

Merge pull request #1775 from ipfs/fix/ipns-old-record

fix publish fail on prexisting bad record
parents 5b2d2ebd b34d41e0
...@@ -227,7 +227,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost ...@@ -227,7 +227,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer) n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer)
// setup name system // setup name system
n.Namesys = namesys.NewNameSystem(n.Routing) n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore())
// setup ipns republishing // setup ipns republishing
err = n.setupIpnsRepublisher() err = n.setupIpnsRepublisher()
...@@ -456,7 +456,7 @@ func (n *IpfsNode) SetupOfflineRouting() error { ...@@ -456,7 +456,7 @@ func (n *IpfsNode) SetupOfflineRouting() error {
n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.PrivateKey) n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.PrivateKey)
n.Namesys = namesys.NewNameSystem(n.Routing) n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore())
return nil return nil
} }
......
...@@ -33,7 +33,7 @@ func InitializeKeyspace(n *core.IpfsNode, key ci.PrivKey) error { ...@@ -33,7 +33,7 @@ func InitializeKeyspace(n *core.IpfsNode, key ci.PrivKey) error {
return err return err
} }
pub := nsys.NewRoutingPublisher(n.Routing) pub := nsys.NewRoutingPublisher(n.Routing, n.Repo.Datastore())
if err := pub.Publish(ctx, key, path.FromKey(nodek)); err != nil { if err := pub.Publish(ctx, key, path.FromKey(nodek)); err != nil {
return err return err
} }
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"strings" "strings"
"time" "time"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
ci "github.com/ipfs/go-ipfs/p2p/crypto" ci "github.com/ipfs/go-ipfs/p2p/crypto"
path "github.com/ipfs/go-ipfs/path" path "github.com/ipfs/go-ipfs/path"
...@@ -25,7 +26,7 @@ type mpns struct { ...@@ -25,7 +26,7 @@ type mpns struct {
} }
// NewNameSystem will construct the IPFS naming system based on Routing // NewNameSystem will construct the IPFS naming system based on Routing
func NewNameSystem(r routing.IpfsRouting) NameSystem { func NewNameSystem(r routing.IpfsRouting, ds ds.Datastore) NameSystem {
return &mpns{ return &mpns{
resolvers: map[string]resolver{ resolvers: map[string]resolver{
"dns": newDNSResolver(), "dns": newDNSResolver(),
...@@ -33,7 +34,7 @@ func NewNameSystem(r routing.IpfsRouting) NameSystem { ...@@ -33,7 +34,7 @@ func NewNameSystem(r routing.IpfsRouting) NameSystem {
"dht": newRoutingResolver(r), "dht": newRoutingResolver(r),
}, },
publishers: map[string]Publisher{ publishers: map[string]Publisher{
"/ipns/": NewRoutingPublisher(r), "/ipns/": NewRoutingPublisher(r, ds),
}, },
} }
} }
......
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
path "github.com/ipfs/go-ipfs/path" path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin" pin "github.com/ipfs/go-ipfs/pin"
routing "github.com/ipfs/go-ipfs/routing" routing "github.com/ipfs/go-ipfs/routing"
dhtpb "github.com/ipfs/go-ipfs/routing/dht/pb"
record "github.com/ipfs/go-ipfs/routing/record" record "github.com/ipfs/go-ipfs/routing/record"
ft "github.com/ipfs/go-ipfs/unixfs" ft "github.com/ipfs/go-ipfs/unixfs"
u "github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
...@@ -37,11 +38,15 @@ var PublishPutValTimeout = time.Minute ...@@ -37,11 +38,15 @@ var PublishPutValTimeout = time.Minute
// routing system. // routing system.
type ipnsPublisher struct { type ipnsPublisher struct {
routing routing.IpfsRouting routing routing.IpfsRouting
ds ds.Datastore
} }
// NewRoutingPublisher constructs a publisher for the IPFS Routing name system. // NewRoutingPublisher constructs a publisher for the IPFS Routing name system.
func NewRoutingPublisher(route routing.IpfsRouting) *ipnsPublisher { func NewRoutingPublisher(route routing.IpfsRouting, ds ds.Datastore) *ipnsPublisher {
return &ipnsPublisher{routing: route} if ds == nil {
panic("nil datastore")
}
return &ipnsPublisher{routing: route, ds: ds}
} }
// Publish implements Publisher. Accepts a keypair and a value, // Publish implements Publisher. Accepts a keypair and a value,
...@@ -62,22 +67,58 @@ func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value ...@@ -62,22 +67,58 @@ func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value
_, ipnskey := IpnsKeysForID(id) _, ipnskey := IpnsKeysForID(id)
// get previous records sequence number, and add one to it // get previous records sequence number
var seqnum uint64 seqnum, err := p.getPreviousSeqNo(ctx, ipnskey)
prevrec, err := p.routing.GetValues(ctx, ipnskey, 0) if err != nil {
return err
}
// increment it
seqnum++
return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id)
}
func (p *ipnsPublisher) getPreviousSeqNo(ctx context.Context, ipnskey key.Key) (uint64, error) {
prevrec, err := p.ds.Get(ipnskey.DsKey())
if err != nil && err != ds.ErrNotFound {
// None found, lets start at zero!
return 0, err
}
var val []byte
if err == nil { if err == nil {
e := new(pb.IpnsEntry) prbytes, ok := prevrec.([]byte)
err := proto.Unmarshal(prevrec[0].Val, e) if !ok {
return 0, fmt.Errorf("unexpected type returned from datastore: %#v", prevrec)
}
dhtrec := new(dhtpb.Record)
err := proto.Unmarshal(prbytes, dhtrec)
if err != nil { if err != nil {
return err return 0, err
} }
seqnum = e.GetSequence() + 1 val = dhtrec.GetValue()
} else if err != ds.ErrNotFound { } else {
return err // try and check the dht for a record
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
rv, err := p.routing.GetValue(ctx, ipnskey)
if err != nil {
// no such record found, start at zero!
return 0, nil
}
val = rv
} }
return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id) e := new(pb.IpnsEntry)
err = proto.Unmarshal(val, e)
if err != nil {
return 0, err
}
return e.GetSequence(), nil
} }
func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqnum uint64, eol time.Time, r routing.IpfsRouting, id peer.ID) error { func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqnum uint64, eol time.Time, r routing.IpfsRouting, id peer.ID) error {
......
...@@ -54,7 +54,7 @@ func TestRepublish(t *testing.T) { ...@@ -54,7 +54,7 @@ func TestRepublish(t *testing.T) {
// have one node publish a record that is valid for 1 second // have one node publish a record that is valid for 1 second
publisher := nodes[3] publisher := nodes[3]
p := path.FromString("/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn") // does not need to be valid p := path.FromString("/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn") // does not need to be valid
rp := namesys.NewRoutingPublisher(publisher.Routing) rp := namesys.NewRoutingPublisher(publisher.Routing, publisher.Repo.Datastore())
err := rp.PublishWithEOL(ctx, publisher.PrivateKey, p, time.Now().Add(time.Second)) err := rp.PublishWithEOL(ctx, publisher.PrivateKey, p, time.Now().Add(time.Second))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
......
package namesys package namesys
import ( import (
"errors"
"testing" "testing"
"time"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
peer "github.com/ipfs/go-ipfs/p2p/peer"
path "github.com/ipfs/go-ipfs/path" path "github.com/ipfs/go-ipfs/path"
mockrouting "github.com/ipfs/go-ipfs/routing/mock" mockrouting "github.com/ipfs/go-ipfs/routing/mock"
u "github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
...@@ -13,9 +17,10 @@ import ( ...@@ -13,9 +17,10 @@ import (
func TestRoutingResolve(t *testing.T) { func TestRoutingResolve(t *testing.T) {
d := mockrouting.NewServer().Client(testutil.RandIdentityOrFatal(t)) d := mockrouting.NewServer().Client(testutil.RandIdentityOrFatal(t))
dstore := ds.NewMapDatastore()
resolver := NewRoutingResolver(d) resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d) publisher := NewRoutingPublisher(d, dstore)
privk, pubk, err := testutil.RandTestKeyPair(512) privk, pubk, err := testutil.RandTestKeyPair(512)
if err != nil { if err != nil {
...@@ -43,3 +48,90 @@ func TestRoutingResolve(t *testing.T) { ...@@ -43,3 +48,90 @@ func TestRoutingResolve(t *testing.T) {
t.Fatal("Got back incorrect value.") t.Fatal("Got back incorrect value.")
} }
} }
func TestPrexistingExpiredRecord(t *testing.T) {
dstore := ds.NewMapDatastore()
d := mockrouting.NewServer().ClientWithDatastore(context.Background(), testutil.RandIdentityOrFatal(t), dstore)
resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d, dstore)
privk, pubk, err := testutil.RandTestKeyPair(512)
if err != nil {
t.Fatal(err)
}
id, err := peer.IDFromPublicKey(pubk)
if err != nil {
t.Fatal(err)
}
// Make an expired record and put it in the datastore
h := path.FromString("/ipfs/QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN")
eol := time.Now().Add(time.Hour * -1)
err = PutRecordToRouting(context.Background(), privk, h, 0, eol, d, id)
if err != nil {
t.Fatal(err)
}
// Now, with an old record in the system already, try and publish a new one
err = publisher.Publish(context.Background(), privk, h)
if err != nil {
t.Fatal(err)
}
err = verifyCanResolve(resolver, id.Pretty(), h)
if err != nil {
t.Fatal(err)
}
}
func TestPrexistingRecord(t *testing.T) {
dstore := ds.NewMapDatastore()
d := mockrouting.NewServer().ClientWithDatastore(context.Background(), testutil.RandIdentityOrFatal(t), dstore)
resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d, dstore)
privk, pubk, err := testutil.RandTestKeyPair(512)
if err != nil {
t.Fatal(err)
}
id, err := peer.IDFromPublicKey(pubk)
if err != nil {
t.Fatal(err)
}
// Make a good record and put it in the datastore
h := path.FromString("/ipfs/QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN")
eol := time.Now().Add(time.Hour)
err = PutRecordToRouting(context.Background(), privk, h, 0, eol, d, id)
if err != nil {
t.Fatal(err)
}
// Now, with an old record in the system already, try and publish a new one
err = publisher.Publish(context.Background(), privk, h)
if err != nil {
t.Fatal(err)
}
err = verifyCanResolve(resolver, id.Pretty(), h)
if err != nil {
t.Fatal(err)
}
}
func verifyCanResolve(r Resolver, name string, exp path.Path) error {
res, err := r.Resolve(context.Background(), name)
if err != nil {
return err
}
if res != exp {
return errors.New("got back wrong record!")
}
return nil
}
...@@ -3,6 +3,7 @@ package dht ...@@ -3,6 +3,7 @@ package dht
import ( import (
"errors" "errors"
"fmt" "fmt"
"time"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
...@@ -10,6 +11,7 @@ import ( ...@@ -10,6 +11,7 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
pb "github.com/ipfs/go-ipfs/routing/dht/pb" pb "github.com/ipfs/go-ipfs/routing/dht/pb"
u "github.com/ipfs/go-ipfs/util"
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
) )
...@@ -46,41 +48,17 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -46,41 +48,17 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// first, is there even a key? // first, is there even a key?
k := pmes.GetKey() k := key.Key(pmes.GetKey())
if k == "" { if k == "" {
return nil, errors.New("handleGetValue but no key was provided") return nil, errors.New("handleGetValue but no key was provided")
// TODO: send back an error response? could be bad, but the other node's hanging. // TODO: send back an error response? could be bad, but the other node's hanging.
} }
// let's first check if we have the value locally. rec, err := dht.checkLocalDatastore(k)
log.Debugf("%s handleGetValue looking into ds", dht.self) if err != nil {
dskey := key.Key(k).DsKey()
iVal, err := dht.datastore.Get(dskey)
log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)
// if we got an unexpected error, bail.
if err != nil && err != ds.ErrNotFound {
return nil, err return nil, err
} }
resp.Record = rec
// if we have the value, send it back
if err == nil {
log.Debugf("%s handleGetValue success!", dht.self)
byts, ok := iVal.([]byte)
if !ok {
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}
rec := new(pb.Record)
err := proto.Unmarshal(byts, rec)
if err != nil {
log.Debug("Failed to unmarshal dht record from datastore")
return nil, err
}
resp.Record = rec
}
// Find closest peer on given cluster to desired key and reply with that info // Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
...@@ -102,6 +80,69 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -102,6 +80,69 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return resp, nil return resp, nil
} }
func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*pb.Record, error) {
log.Debugf("%s handleGetValue looking into ds", dht.self)
dskey := k.DsKey()
iVal, err := dht.datastore.Get(dskey)
log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)
if err == ds.ErrNotFound {
return nil, nil
}
// if we got an unexpected error, bail.
if err != nil {
return nil, err
}
// if we have the value, send it back
log.Debugf("%s handleGetValue success!", dht.self)
byts, ok := iVal.([]byte)
if !ok {
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}
rec := new(pb.Record)
err = proto.Unmarshal(byts, rec)
if err != nil {
log.Debug("Failed to unmarshal dht record from datastore")
return nil, err
}
// if its our record, dont bother checking the times on it
if peer.ID(rec.GetAuthor()) == dht.self {
return rec, nil
}
var recordIsBad bool
recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
if err != nil {
log.Info("either no receive time set on record, or it was invalid: ", err)
recordIsBad = true
}
if time.Now().Sub(recvtime) > MaxRecordAge {
log.Debug("old record found, tossing.")
recordIsBad = true
}
// NOTE: we do not verify the record here beyond checking these timestamps.
// we put the burden of checking the records on the requester as checking a record
// may be computationally expensive
if recordIsBad {
err := dht.datastore.Delete(dskey)
if err != nil {
log.Error("Failed to delete bad record from datastore: ", err)
}
return nil, nil // can treat this as not having the record at all
}
return rec, nil
}
// Store a value in this peer local storage // Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handlePutValue", p).Done() defer log.EventBegin(ctx, "handlePutValue", p).Done()
...@@ -112,7 +153,12 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -112,7 +153,12 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return nil, err return nil, err
} }
data, err := proto.Marshal(pmes.GetRecord()) rec := pmes.GetRecord()
// record the time we receive every record
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
data, err := proto.Marshal(rec)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -14,7 +14,7 @@ It has these top-level messages: ...@@ -14,7 +14,7 @@ It has these top-level messages:
*/ */
package dht_pb package dht_pb
import proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" import proto "github.com/gogo/protobuf/proto"
import math "math" import math "math"
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
...@@ -221,8 +221,10 @@ type Record struct { ...@@ -221,8 +221,10 @@ type Record struct {
// hash of the authors public key // hash of the authors public key
Author *string `protobuf:"bytes,3,opt,name=author" json:"author,omitempty"` Author *string `protobuf:"bytes,3,opt,name=author" json:"author,omitempty"`
// A PKI signature for the key+value+author // A PKI signature for the key+value+author
Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"` Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"`
XXX_unrecognized []byte `json:"-"` // Time the record was received, set by receiver
TimeReceived *string `protobuf:"bytes,5,opt,name=timeReceived" json:"timeReceived,omitempty"`
XXX_unrecognized []byte `json:"-"`
} }
func (m *Record) Reset() { *m = Record{} } func (m *Record) Reset() { *m = Record{} }
...@@ -257,6 +259,13 @@ func (m *Record) GetSignature() []byte { ...@@ -257,6 +259,13 @@ func (m *Record) GetSignature() []byte {
return nil return nil
} }
func (m *Record) GetTimeReceived() string {
if m != nil && m.TimeReceived != nil {
return *m.TimeReceived
}
return ""
}
func init() { func init() {
proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value) proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value)
proto.RegisterEnum("dht.pb.Message_ConnectionType", Message_ConnectionType_name, Message_ConnectionType_value) proto.RegisterEnum("dht.pb.Message_ConnectionType", Message_ConnectionType_name, Message_ConnectionType_value)
......
...@@ -75,4 +75,7 @@ message Record { ...@@ -75,4 +75,7 @@ message Record {
// A PKI signature for the key+value+author // A PKI signature for the key+value+author
optional bytes signature = 4; optional bytes signature = 4;
// Time the record was received, set by receiver
optional string timeReceived = 5;
} }
...@@ -2,6 +2,7 @@ package dht ...@@ -2,6 +2,7 @@ package dht
import ( import (
"fmt" "fmt"
"time"
ctxfrac "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/frac" ctxfrac "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/frac"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
...@@ -12,6 +13,14 @@ import ( ...@@ -12,6 +13,14 @@ import (
record "github.com/ipfs/go-ipfs/routing/record" record "github.com/ipfs/go-ipfs/routing/record"
) )
// MaxRecordAge specifies the maximum time that any node will hold onto a record
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
const MaxRecordAge = time.Hour * 36
func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
log.Debugf("getPublicKey for: %s", p) log.Debugf("getPublicKey for: %s", p)
......
...@@ -4,12 +4,15 @@ import ( ...@@ -4,12 +4,15 @@ import (
"errors" "errors"
"time" "time"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
routing "github.com/ipfs/go-ipfs/routing" routing "github.com/ipfs/go-ipfs/routing"
dhtpb "github.com/ipfs/go-ipfs/routing/dht/pb"
u "github.com/ipfs/go-ipfs/util"
"github.com/ipfs/go-ipfs/util/testutil" "github.com/ipfs/go-ipfs/util/testutil"
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0" logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
) )
...@@ -25,7 +28,16 @@ type client struct { ...@@ -25,7 +28,16 @@ type client struct {
// FIXME(brian): is this method meant to simulate putting a value into the network? // FIXME(brian): is this method meant to simulate putting a value into the network?
func (c *client) PutValue(ctx context.Context, key key.Key, val []byte) error { func (c *client) PutValue(ctx context.Context, key key.Key, val []byte) error {
log.Debugf("PutValue: %s", key) log.Debugf("PutValue: %s", key)
return c.datastore.Put(key.DsKey(), val) rec := new(dhtpb.Record)
rec.Value = val
rec.Key = proto.String(string(key))
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
data, err := proto.Marshal(rec)
if err != nil {
return err
}
return c.datastore.Put(key.DsKey(), data)
} }
// FIXME(brian): is this method meant to simulate getting a value from the network? // FIXME(brian): is this method meant to simulate getting a value from the network?
...@@ -41,21 +53,22 @@ func (c *client) GetValue(ctx context.Context, key key.Key) ([]byte, error) { ...@@ -41,21 +53,22 @@ func (c *client) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
return nil, errors.New("could not cast value from datastore") return nil, errors.New("could not cast value from datastore")
} }
return data, nil rec := new(dhtpb.Record)
err = proto.Unmarshal(data, rec)
if err != nil {
return nil, err
}
return rec.GetValue(), nil
} }
func (c *client) GetValues(ctx context.Context, key key.Key, count int) ([]routing.RecvdVal, error) { func (c *client) GetValues(ctx context.Context, key key.Key, count int) ([]routing.RecvdVal, error) {
log.Debugf("GetValue: %s", key) log.Debugf("GetValues: %s", key)
v, err := c.datastore.Get(key.DsKey()) data, err := c.GetValue(ctx, key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
data, ok := v.([]byte)
if !ok {
return nil, errors.New("could not cast value from datastore")
}
return []routing.RecvdVal{{Val: data, From: c.peer.ID()}}, nil return []routing.RecvdVal{{Val: data, From: c.peer.ID()}}, nil
} }
......
...@@ -80,7 +80,7 @@ func (rs *s) Client(p testutil.Identity) Client { ...@@ -80,7 +80,7 @@ func (rs *s) Client(p testutil.Identity) Client {
func (rs *s) ClientWithDatastore(_ context.Context, p testutil.Identity, datastore ds.Datastore) Client { func (rs *s) ClientWithDatastore(_ context.Context, p testutil.Identity, datastore ds.Datastore) Client {
return &client{ return &client{
peer: p, peer: p,
datastore: ds.NewMapDatastore(), datastore: datastore,
server: rs, server: rs,
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment