Commit cad57471 authored by Steven Allen's avatar Steven Allen

update for the routing refactor

GetValues was very DHT specific so the routing interface has been updated to
remove that function. Instead, it has introduced general-purpose options.

This is a minimal alternative to #141 to avoid bundling too many changes
together.
parent f9f7b874
package dht
import (
ropts "github.com/libp2p/go-libp2p-routing/options"
)
type quorumOptionKey struct{}
// Quorum is a DHT option that tells the DHT how many peers it needs to get
// values from before returning the best one.
//
// Default: 16
func Quorum(n int) ropts.Option {
return func(opts *ropts.Options) error {
if opts.Other == nil {
opts.Other = make(map[interface{}]interface{}, 1)
}
opts.Other[quorumOptionKey{}] = n
return nil
}
}
func getQuorum(opts *ropts.Options) int {
responsesNeeded, ok := opts.Other[quorumOptionKey{}].(int)
if !ok {
responsesNeeded = 16
}
return responsesNeeded
}
......@@ -84,9 +84,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmZix3EdeAdc4wnRksRXWEQ6kbqiFAP16h3Sq9JnEiP71N",
"hash": "QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz",
"name": "go-libp2p-routing",
"version": "2.2.22"
"version": "2.3.0"
},
{
"author": "whyrusleeping",
......
......@@ -78,17 +78,12 @@ func (dht *IpfsDHT) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubK
// Only retrieve one value, because the public key is immutable
// so there's no need to retrieve multiple versions
pkkey := routing.KeyForPublicKey(p)
vals, err := dht.GetValues(ctx, pkkey, 1)
val, err := dht.GetValue(ctx, pkkey, Quorum(1))
if err != nil {
return nil, err
}
if len(vals) == 0 || vals[0].Val == nil {
log.Debugf("Could not find public key for %v in DHT", p)
return nil, routing.ErrNotFound
}
pubk, err := ci.UnmarshalPublicKey(vals[0].Val)
pubk, err := ci.UnmarshalPublicKey(val)
if err != nil {
log.Errorf("Could not unmarshall public key retrieved from DHT for %v", p)
return nil, err
......
......@@ -21,6 +21,7 @@ import (
record "github.com/libp2p/go-libp2p-record"
routing "github.com/libp2p/go-libp2p-routing"
notif "github.com/libp2p/go-libp2p-routing/notifications"
ropts "github.com/libp2p/go-libp2p-routing/options"
)
// asyncQueryBuffer is the size of buffered channels in async queries. This
......@@ -35,7 +36,7 @@ var asyncQueryBuffer = 10
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) (err error) {
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
eip := log.EventBegin(ctx, "PutValue")
defer func() {
eip.Append(loggableKey(key))
......@@ -80,8 +81,14 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) (err
return nil
}
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
Val []byte
From peer.ID
}
// GetValue searches for the value corresponding to given Key.
func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err error) {
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
eip := log.EventBegin(ctx, "GetValue")
defer func() {
eip.Append(loggableKey(key))
......@@ -93,7 +100,17 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err err
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
vals, err := dht.GetValues(ctx, key, 16)
var cfg ropts.Options
if err := cfg.Apply(opts...); err != nil {
return nil, err
}
responsesNeeded := 0
if !cfg.Offline {
responsesNeeded = getQuorum(&cfg)
}
vals, err := dht.GetValues(ctx, key, responsesNeeded)
if err != nil {
return nil, err
}
......@@ -124,7 +141,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err err
for _, v := range vals {
// if someone sent us a different 'less-valid' record, lets correct them
if !bytes.Equal(v.Val, best) {
go func(v routing.RecvdVal) {
go func(v RecvdVal) {
if v.From == dht.self {
err := dht.putLocal(key, fixupRec)
if err != nil {
......@@ -145,7 +162,8 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err err
return best, nil
}
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []routing.RecvdVal, err error) {
// GetValues gets nvals values corresponding to the given key.
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
eip := log.EventBegin(ctx, "GetValues")
defer func() {
eip.Append(loggableKey(key))
......@@ -154,7 +172,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []r
}
eip.Done()
}()
vals := make([]routing.RecvdVal, 0, nvals)
vals := make([]RecvdVal, 0, nvals)
var valslock sync.Mutex
// If we have it local, don't bother doing an RPC!
......@@ -163,7 +181,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []r
// TODO: this is tricky, we don't always want to trust our own value
// what if the authoritative source updated it?
log.Debug("have it locally")
vals = append(vals, routing.RecvdVal{
vals = append(vals, RecvdVal{
Val: lrec.GetValue(),
From: dht.self,
})
......@@ -212,7 +230,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []r
res := &dhtQueryResult{closerPeers: peers}
if rec.GetValue() != nil || err == errInvalidRecord {
rv := routing.RecvdVal{
rv := RecvdVal{
Val: rec.GetValue(),
From: p,
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment