Commit daab800d authored by gammazero's avatar gammazero Committed by Adin Schmahmann

Cleanup execOnMany and allow it to be used in both sloppy and non-sloppy mode

The execOnMany function was able to exit prematurely, leaving its child goroutines running.  These would write to a channel that closed after execOnMany returned in findProvidersAsyncRoutine. The function is now able to run both in a sloppy mode where it allows goroutines to be cleaned up after the function has completed as well a safer non-sloppy mode where goroutines will complete before the function returns. The sloppy mode is used for DHT "get" operations like FindProviders and SearchValues whereas the non-sloppy mode is used for "put" operations like PutValue and Provide (along with their bulk operation equivalents).

This fixes https://github.com/ipfs/go-ipfs/issues/8146
parent 5eb9aaa3
...@@ -464,7 +464,7 @@ func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts ...@@ -464,7 +464,7 @@ func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts
}) })
err := dht.protoMessenger.PutValue(ctx, p, rec) err := dht.protoMessenger.PutValue(ctx, p, rec)
return err return err
}, peers) }, peers, true)
if successes == 0 { if successes == 0 {
return fmt.Errorf("failed to complete put") return fmt.Errorf("failed to complete put")
...@@ -751,7 +751,7 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str ...@@ -751,7 +751,7 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str
return nil return nil
} }
dht.execOnMany(ctx, queryFn, peers) dht.execOnMany(ctx, queryFn, peers, false)
lookupResCh <- &lookupWithFollowupResult{peers: peers} lookupResCh <- &lookupWithFollowupResult{peers: peers}
}() }()
return valCh, lookupResCh return valCh, lookupResCh
...@@ -817,7 +817,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e ...@@ -817,7 +817,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error { successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.h) err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.h)
return err return err
}, peers) }, peers, true)
if exceededDeadline { if exceededDeadline {
return context.DeadlineExceeded return context.DeadlineExceeded
...@@ -830,45 +830,71 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e ...@@ -830,45 +830,71 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
return ctx.Err() return ctx.Err()
} }
func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID) int { // execOnMany executes the given function on each of the peers, although it may only wait for a certain chunk of peers
// to respond before considering the results "good enough" and returning.
//
// If sloppyExit is true then this function will return without waiting for all of its internal goroutines to close.
// If sloppyExit is true then the passed in function MUST be able to safely complete an arbitrary amount of time after
// execOnMany has returned (e.g. do not write to resources that might get closed or set to nil and therefore result in
// a panic instead of just returning an error).
func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID, sloppyExit bool) int {
if len(peers) == 0 { if len(peers) == 0 {
return 0 return 0
} }
putctx, cancel := context.WithCancel(ctx) // having a buffer that can take all of the elements is basically a hack to allow for sloppy exits that clean up
// the goroutines after the function is done rather than before
errCh := make(chan error, len(peers))
numSuccessfulToWaitFor := int(float64(len(peers)) * dht.waitFrac)
putctx, cancel := context.WithTimeout(ctx, dht.timeoutPerOp)
defer cancel() defer cancel()
waitAllCh := make(chan struct{}, len(peers))
numSuccessfulToWaitFor := int(float64(len(peers)) * dht.waitFrac)
waitSuccessCh := make(chan struct{}, numSuccessfulToWaitFor)
for _, p := range peers { for _, p := range peers {
go func(p peer.ID) { go func(p peer.ID) {
fnCtx, fnCancel := context.WithTimeout(putctx, dht.timeoutPerOp) errCh <- fn(putctx, p)
defer fnCancel()
err := fn(fnCtx, p)
if err != nil {
logger.Debug(err)
} else {
waitSuccessCh <- struct{}{}
}
waitAllCh <- struct{}{}
}(p) }(p)
} }
numSuccess, numDone := 0, 0 var numDone, numSuccess, successSinceLastTick int
t := time.NewTimer(time.Hour) var ticker *time.Ticker
for numDone != len(peers) { var tickChan <-chan time.Time
for numDone < len(peers) {
select { select {
case <-waitAllCh: case err := <-errCh:
numDone++ numDone++
case <-waitSuccessCh: if err == nil {
if numSuccess >= numSuccessfulToWaitFor {
t.Reset(time.Millisecond * 500)
}
numSuccess++ numSuccess++
numDone++ if numSuccess >= numSuccessfulToWaitFor && ticker == nil {
case <-t.C: // Once there are enough successes, wait a little longer
ticker = time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
tickChan = ticker.C
successSinceLastTick = numSuccess
}
// This is equivalent to numSuccess * 2 + numFailures >= len(peers) and is a heuristic that seems to be
// performing reasonably.
// TODO: Make this metric more configurable
// TODO: Have better heuristics in this function whether determined from observing static network
// properties or dynamically calculating them
if numSuccess+numDone >= len(peers) {
cancel()
if sloppyExit {
return numSuccess
}
}
}
case <-tickChan:
if numSuccess > successSinceLastTick {
// If there were additional successes, then wait another tick
successSinceLastTick = numSuccess
} else {
cancel() cancel()
if sloppyExit {
return numSuccess
}
}
} }
} }
return numSuccess return numSuccess
...@@ -902,7 +928,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) ...@@ -902,7 +928,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
pmes.ProviderPeers = pbPeers pmes.ProviderPeers = pbPeers
return dht.messageSender.SendMessage(ctx, p, pmes) return dht.messageSender.SendMessage(ctx, p, pmes)
}, peers) }, peers, true)
if successes == 0 { if successes == 0 {
return fmt.Errorf("no successful provides") return fmt.Errorf("no successful provides")
} }
...@@ -945,7 +971,7 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) ...@@ -945,7 +971,7 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte)
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error { successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
keyStr := string(k) keyStr := string(k)
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr])) return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
}, peers) }, peers, true)
if successes == 0 { if successes == 0 {
return fmt.Errorf("no successful puts") return fmt.Errorf("no successful puts")
} }
...@@ -1170,7 +1196,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. ...@@ -1170,7 +1196,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
return nil return nil
} }
dht.execOnMany(queryctx, fn, peers) dht.execOnMany(queryctx, fn, peers, false)
} }
// FindPeer searches for a peer with given ID. // FindPeer searches for a peer with given ID.
...@@ -1255,7 +1281,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, e ...@@ -1255,7 +1281,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, e
return nil return nil
} }
dht.execOnMany(queryctx, fn, peers) dht.execOnMany(queryctx, fn, peers, false)
close(addrsCh) close(addrsCh)
wg.Wait() wg.Wait()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment