Commit 4aee8bb8 authored by Adin Schmahmann's avatar Adin Schmahmann Committed by Steven Allen

Search/GetValues stops if using quorum

parent 616b0e12
...@@ -171,13 +171,14 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing ...@@ -171,13 +171,14 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing
responsesNeeded = getQuorum(&cfg, 0) responsesNeeded = getQuorum(&cfg, 0)
} }
valCh, queries := dht.getValues(ctx, key, func() bool { return false }) stopCh := make(chan struct{})
valCh, queries := dht.getValues(ctx, key, stopCh)
out := make(chan []byte) out := make(chan []byte)
go func() { go func() {
defer close(out) defer close(out)
best, peersWithBest := dht.searchValueQuorum(ctx, key, valCh, out, responsesNeeded) best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
if best == nil { if best == nil || aborted {
return return
} }
...@@ -205,8 +206,8 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing ...@@ -205,8 +206,8 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing
return out, nil return out, nil
} }
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}) { out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
numResponses := 0 numResponses := 0
return dht.processValues(ctx, key, valCh, return dht.processValues(ctx, key, valCh,
func(ctx context.Context, v RecvdVal, better bool) bool { func(ctx context.Context, v RecvdVal, better bool) bool {
...@@ -220,6 +221,7 @@ func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-c ...@@ -220,6 +221,7 @@ func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-c
} }
if nvals > 0 && numResponses > nvals { if nvals > 0 && numResponses > nvals {
close(stopCh)
return true return true
} }
return false return false
...@@ -237,9 +239,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R ...@@ -237,9 +239,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R
defer eip.Done() defer eip.Done()
queryCtx, cancel := context.WithCancel(ctx) queryCtx, cancel := context.WithCancel(ctx)
valCh, _ := dht.getValues(queryCtx, key, func() bool { valCh, _ := dht.getValues(queryCtx, key, nil)
return false
})
out := make([]RecvdVal, 0, nvals) out := make([]RecvdVal, 0, nvals)
for val := range valCh { for val := range valCh {
...@@ -253,13 +253,11 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R ...@@ -253,13 +253,11 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R
} }
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal, func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}) { newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
aborted := false
loop: loop:
for { for {
if aborted { if aborted {
return best, nil return
} }
select { select {
...@@ -294,9 +292,6 @@ loop: ...@@ -294,9 +292,6 @@ loop:
} }
} }
if aborted {
return best, nil
}
return return
} }
...@@ -322,7 +317,7 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte ...@@ -322,7 +317,7 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte
} }
} }
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopFn func() bool) (<-chan RecvdVal, <-chan []*qu) { func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan []*qu) {
valCh := make(chan RecvdVal, 1) valCh := make(chan RecvdVal, 1)
queriesCh := make(chan []*qu, 1) queriesCh := make(chan []*qu, 1)
...@@ -387,7 +382,12 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopFn func() boo ...@@ -387,7 +382,12 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopFn func() boo
return peers, err return peers, err
}, },
func(peerset *kpeerset.SortedPeerset) bool { func(peerset *kpeerset.SortedPeerset) bool {
return stopFn() select {
case <-stopQuery:
return true
default:
return false
}
}, },
) )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment