Commit 26f78574 authored by Jeromy's avatar Jeromy

remove buffer timing in bitswap in favor of manual batching

parent 44f32138
......@@ -43,7 +43,7 @@ func New(ctx context.Context, p peer.Peer,
routing: routing,
sender: network,
wantlist: u.NewKeySet(),
blockRequests: make(chan u.Key, 32),
batchRequests: make(chan []u.Key, 32),
}
network.SetDelegate(bs)
go bs.run(ctx)
......@@ -66,7 +66,10 @@ type bitswap struct {
notifications notifications.PubSub
blockRequests chan u.Key
// Requests for a set of related blocks
// the assumption is made that the same peer is likely to
// have more than a single block in the set
batchRequests chan []u.Key
// strategy listens to network traffic and makes decisions about how to
// interact with partners.
......@@ -97,7 +100,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
promise := bs.notifications.Subscribe(ctx, k)
select {
case bs.blockRequests <- k:
case bs.batchRequests <- []u.Key{k}:
case <-parent.Done():
return nil, parent.Err()
}
......@@ -159,51 +162,32 @@ func (bs *bitswap) run(ctx context.Context) {
// Every so often, we should resend out our current want list
rebroadcastTime := time.Second * 5
var providers <-chan peer.Peer // NB: must be initialized to zero value
broadcastSignal := time.After(bs.strategy.GetRebroadcastDelay())
broadcastSignal := time.NewTicker(bs.strategy.GetRebroadcastDelay())
// Number of unsent keys for the current batch
unsentKeys := 0
for {
select {
case <-broadcastSignal:
unsentKeys = 0
case <-broadcastSignal.C:
wantlist := bs.wantlist.Keys()
if len(wantlist) == 0 {
continue
}
if providers == nil {
// rely on semi randomness of maps
firstKey := wantlist[0]
providers = bs.routing.FindProvidersAsync(ctx, firstKey, maxProvidersPerRequest)
}
providers := bs.routing.FindProvidersAsync(ctx, wantlist[0], maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
providers = nil
broadcastSignal = time.After(bs.strategy.GetRebroadcastDelay())
case k := <-bs.blockRequests:
if unsentKeys == 0 {
providers = bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest)
case ks := <-bs.batchRequests:
if len(ks) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
unsentKeys++
providers := bs.routing.FindProvidersAsync(ctx, ks[0], maxProvidersPerRequest)
if unsentKeys >= bs.strategy.GetBatchSize() {
// send wantlist to providers
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
unsentKeys = 0
broadcastSignal = time.After(bs.strategy.GetRebroadcastDelay())
providers = nil
} else {
// set a timeout to wait for more blocks or send current wantlist
broadcastSignal = time.After(bs.strategy.GetBatchDelay())
}
case <-ctx.Done():
return
}
......
......@@ -345,7 +345,7 @@ func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
routing: htc,
sender: adapter,
wantlist: util.NewKeySet(),
blockRequests: make(chan util.Key, 32),
batchRequests: make(chan []util.Key, 32),
}
adapter.SetDelegate(bs)
go bs.run(context.TODO())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment