Commit 564764fb authored by Adin Schmahmann's avatar Adin Schmahmann Committed by Steven Allen

fix Search/GetValue to be Kad compliant. Default quorum is now 0 which means...

fix Search/GetValue to be Kad compliant. Default quorum is now 0 which means do not abort the query early
parent 6793dc11
......@@ -8,7 +8,6 @@ import (
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"testing"
"time"
......@@ -1673,13 +1672,13 @@ func TestGetSetPluggedProtocol(t *testing.T) {
time.Sleep(time.Second * 2)
err = dhtA.PutValue(ctx, "/v/cat", []byte("meow"))
if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
t.Fatalf("put should not have been able to find any peers in routing table, err:'%v'", err)
if err != nil {
t.Fatalf("putting to an empty routing table should succeed, err: '%v'", err)
}
_, err = dhtB.GetValue(ctx, "/v/cat")
if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
t.Fatalf("get should not have been able to find any peers in routing table, err:'%v'", err)
v, err := dhtB.GetValue(ctx, "/v/cat")
if v != nil || err != routing.ErrNotFound {
t.Fatalf("get should have failed from not being able to find the value, err: '%v'", err)
}
})
}
......
......@@ -69,6 +69,7 @@ func multihashLoggableKey(mh multihash.Multihash) logging.LoggableMap {
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
//TODO: I can break the interface! return []peer.ID
e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key))
defer e.Done()
......
......@@ -112,6 +112,7 @@ func strictParallelismQuery(q *qu) {
var lastPeers []peer.ID
for {
topPeers := q.localPeers.TopK()
peersToQuery := q.localPeers.KUnqueried()
if len(peersToQuery) == 0 {
......@@ -119,11 +120,13 @@ func strictParallelismQuery(q *qu) {
}
numQuery := AlphaValue
if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) {
// TODO: alternative: Check if we did not get any peers closer than alpha closest try k
if lastPeers != nil && peerSlicesEqual(lastPeers, topPeers) {
numQuery = len(peersToQuery)
} else if pqLen := len(peersToQuery); pqLen < numQuery {
numQuery = pqLen
}
lastPeers = topPeers
queryResCh := make(chan bool, numQuery)
resultsReceived := 0
......@@ -306,10 +309,6 @@ func (q *qu) queryPeer(ctx context.Context, p peer.ID) bool {
return true
}
/*
Call func d times, each with a different set of peers (take k closest and shuffle + divide into d buckets)
*/
func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
// short-circuit if we're already connected.
if dht.host.Network().Connectedness(p) == network.Connected {
......
......@@ -171,104 +171,59 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing
responsesNeeded = getQuorum(&cfg, 0)
}
valCh := dht.getValues(ctx, key, responsesNeeded)
valCh, queries := dht.getValues(ctx, key, func() bool { return false })
out := make(chan []byte)
go func() {
defer close(out)
maxVals := responsesNeeded
if maxVals < 0 {
maxVals = defaultQuorum * 4 // we want some upper bound on how
// much correctional entries we will send
best, peersWithBest := dht.searchValueQuorum(ctx, key, valCh, out, responsesNeeded)
if best == nil {
return
}
// vals is used collect entries we got so far and send corrections to peers
// when we exit this function
vals := make([]RecvdVal, 0, maxVals)
var best *RecvdVal
defer func() {
if len(vals) <= 1 || best == nil {
updatePeers := make([]peer.ID, 0, dht.bucketSize)
select {
case q := <-queries:
if len(q) < 1 {
return
}
fixupRec := record.MakePutRecord(key, best.Val)
correctingEntries := make(map[peer.ID]RecvdVal)
correctingPeers := make([]peer.ID, 0, len(vals))
for _, v := range vals {
// if someone sent us a different 'less-valid' record, lets correct them
if !bytes.Equal(v.Val, best.Val) {
correctingEntries[v.From] = v
correctingPeers = append(correctingPeers, v.From)
peers := q[0].globallyQueriedPeers.Peers()
peers = kb.SortClosestPeers(peers, kb.ConvertKey(key))
for _, p := range peers {
if _, ok := peersWithBest[p]; !ok {
updatePeers = append(updatePeers, p)
}
}
case <-ctx.Done():
return
}
// only correct the peers closest to the target
correctingPeers = kb.SortClosestPeers(correctingPeers, kb.ConvertKey(key))
if numCorrectingPeers := len(correctingPeers); maxVals > numCorrectingPeers {
maxVals = numCorrectingPeers
}
for _, p := range correctingPeers[:maxVals] {
go func(v RecvdVal) {
if v.From == dht.self {
err := dht.putLocal(key, fixupRec)
if err != nil {
logger.Error("Error correcting local dht entry:", err)
}
return
}
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
defer cancel()
err := dht.putValueToPeer(ctx, v.From, fixupRec)
if err != nil {
logger.Debug("Error correcting DHT entry: ", err)
}
}(correctingEntries[p])
}
}()
for {
select {
case v, ok := <-valCh:
if !ok {
return
}
dht.updatePeerValues(dht.Context(), key, best, updatePeers)
}()
vals = append(vals, v)
return out, nil
}
if v.Val == nil {
continue
}
// Select best value
if best != nil {
if bytes.Equal(best.Val, v.Val) {
continue
}
sel, err := dht.Validator.Select(key, [][]byte{best.Val, v.Val})
if err != nil {
logger.Warning("Failed to select dht key: ", err)
continue
}
if sel != 1 {
continue
}
}
best = &v
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal,
out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}) {
numResponses := 0
return dht.processValues(ctx, key, valCh,
func(ctx context.Context, v RecvdVal, better bool) bool {
numResponses++
if better {
select {
case out <- v.Val:
case <-ctx.Done():
return
return false
}
case <-ctx.Done():
return
}
}
}()
return out, nil
if nvals > 0 && numResponses > nvals {
return true
}
return false
})
}
// GetValues gets nvals values corresponding to the given key.
......@@ -281,92 +236,105 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R
eip.Append(loggableKey(key))
defer eip.Done()
valCh := dht.getValues(ctx, key, nvals)
queryCtx, cancel := context.WithCancel(ctx)
valCh, _ := dht.getValues(queryCtx, key, func() bool {
return false
})
out := make([]RecvdVal, 0, nvals)
for val := range valCh {
out = append(out, val)
if len(out) == nvals {
cancel()
}
}
return out, ctx.Err()
}
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) <-chan RecvdVal {
valCh := make(chan RecvdVal, 1)
valSignal := make(chan struct{}, 1)
valsSent := 0
valsSentMx := sync.RWMutex{}
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}) {
aborted := false
vbuf := make([]RecvdVal, 0, 32)
vbufMx := sync.Mutex{}
valEmitCtx, cancelValEmit := context.WithCancel(ctx)
go func() {
defer close(valCh)
for {
var next RecvdVal
loop:
for {
if aborted {
return best, nil
}
trySetNext := func() bool {
vbufMx.Lock()
notEmpty := len(vbuf) > 0
if notEmpty {
next = vbuf[0]
vbuf = vbuf[1:]
}
vbufMx.Unlock()
return notEmpty
select {
case v, ok := <-vals:
if !ok {
break loop
}
if !trySetNext() {
signal:
for {
select {
case <-valSignal:
if trySetNext() {
break signal
}
case <-valEmitCtx.Done():
if !trySetNext() {
return
}
for {
select {
case valCh <- next:
valsSentMx.Lock()
valsSent++
valsSentMx.Unlock()
if !trySetNext() {
return
}
default:
return
}
}
}
// Select best value
if best != nil {
if bytes.Equal(best, v.Val) {
peersWithBest[v.From] = struct{}{}
aborted = newVal(ctx, v, false)
continue
}
sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
if err != nil {
logger.Warning("Failed to select dht key: ", err)
continue
}
if sel != 1 {
aborted = newVal(ctx, v, false)
continue
}
}
peersWithBest = make(map[peer.ID]struct{})
peersWithBest[v.From] = struct{}{}
best = v.Val
aborted = newVal(ctx, v, true)
case <-ctx.Done():
return
}
}
send:
for {
select {
case valCh <- next:
valsSentMx.Lock()
valsSent++
valsSentMx.Unlock()
if aborted {
return best, nil
}
return
}
if !trySetNext() {
break send
}
case <-valEmitCtx.Done():
break send
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
fixupRec := record.MakePutRecord(key, val)
for _, p := range peers {
go func(p peer.ID) {
//TODO: Is this possible?
if p == dht.self {
err := dht.putLocal(key, fixupRec)
if err != nil {
logger.Error("Error correcting local dht entry:", err)
}
return
}
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
err := dht.putValueToPeer(ctx, p, fixupRec)
if err != nil {
logger.Debug("Error correcting DHT entry: ", err)
}
}(p)
}
}
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopFn func() bool) (<-chan RecvdVal, <-chan []*qu) {
valCh := make(chan RecvdVal, 1)
queriesCh := make(chan []*qu, 1)
if rec, err := dht.getLocal(key); rec != nil && err == nil {
select {
case valCh <- RecvdVal{
Val: rec.GetValue(),
From: dht.self,
}:
case <-ctx.Done():
}
}()
}
go func() {
queries := dht.runDisjointQueries(ctx, dht.d, key,
......@@ -402,12 +370,10 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) <-chan
From: p,
}
vbufMx.Lock()
vbuf = append(vbuf, rv)
vbufMx.Unlock()
select {
case valSignal <- struct{}{}:
default:
case valCh <- rv:
case <-ctx.Done():
return nil, ctx.Err()
}
}
......@@ -421,16 +387,13 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) <-chan
return peers, err
},
func(peerset *kpeerset.SortedPeerset) bool {
if nvals <= 0 {
return false
}
valsSentMx.RLock()
defer valsSentMx.RUnlock()
return valsSent >= nvals
return stopFn()
},
)
cancelValEmit()
close(valCh)
queriesCh <- queries
close(queriesCh)
shortcutTaken := false
for _, q := range queries {
......@@ -447,7 +410,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) <-chan
}
}()
return valCh
return valCh, queriesCh
}
// Provider abstraction for indirect stores.
......@@ -667,19 +630,6 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
return pi, nil
}
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return peer.AddrInfo{}, kb.ErrLookupFailure
}
// Sanity...
for _, p := range peers {
if p == id {
logger.Debug("found target peer in list of closest peers...")
return dht.peerstore.PeerInfo(p), nil
}
}
queries := dht.runDisjointQueries(ctx, dht.d, string(id),
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
......
......@@ -4,7 +4,7 @@ import "github.com/libp2p/go-libp2p-core/routing"
type quorumOptionKey struct{}
const defaultQuorum = 16
const defaultQuorum = 0
// Quorum is a DHT option that tells the DHT how many peers it needs to get
// values from before returning the best one.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment