Commit 19da0570 authored by Jeromy's avatar Jeromy

remove buffer timing in bitswap in favor of manual batching

parent e4b2ae3b
......@@ -43,7 +43,7 @@ func New(ctx context.Context, p peer.Peer,
routing: routing,
sender: network,
wantlist: u.NewKeySet(),
blockRequests: make(chan u.Key, 32),
batchRequests: make(chan []u.Key, 32),
......@@ -66,7 +66,10 @@ type bitswap struct {
notifications notifications.PubSub
blockRequests chan u.Key
// Requests for a set of related blocks
// the assumption is made that the same peer is likely to
// have more than a single block in the set
batchRequests chan []u.Key
// strategy listens to network traffic and makes decisions about how to
// interact with partners.
......@@ -97,7 +100,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
promise := bs.notifications.Subscribe(ctx, k)
select {
case bs.blockRequests <- k:
case bs.batchRequests <- []u.Key{k}:
case <-parent.Done():
return nil, parent.Err()
......@@ -159,50 +162,31 @@ func (bs *bitswap) run(ctx context.Context) {
// Every so often, we should resend out our current want list
rebroadcastTime := time.Second * 5
var providers <-chan peer.Peer // NB: must be initialized to zero value
broadcastSignal := time.After(bs.strategy.GetRebroadcastDelay())
broadcastSignal := time.NewTicker(bs.strategy.GetRebroadcastDelay())
// Number of unsent keys for the current batch
unsentKeys := 0
for {
select {
case <-broadcastSignal:
unsentKeys = 0
case <-broadcastSignal.C:
wantlist := bs.wantlist.Keys()
if len(wantlist) == 0 {
if providers == nil {
// rely on semi randomness of maps
firstKey := wantlist[0]
providers = bs.routing.FindProvidersAsync(ctx, firstKey, maxProvidersPerRequest)
providers := bs.routing.FindProvidersAsync(ctx, wantlist[0], maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
providers = nil
broadcastSignal = time.After(bs.strategy.GetRebroadcastDelay())
case k := <-bs.blockRequests:
if unsentKeys == 0 {
providers = bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest)
case ks := <-bs.batchRequests:
if len(ks) == 0 {
log.Warning("Received batch request for zero blocks")
if unsentKeys >= bs.strategy.GetBatchSize() {
// send wantlist to providers
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
unsentKeys = 0
broadcastSignal = time.After(bs.strategy.GetRebroadcastDelay())
providers = nil
} else {
// set a timeout to wait for more blocks or send current wantlist
providers := bs.routing.FindProvidersAsync(ctx, ks[0], maxProvidersPerRequest)
broadcastSignal = time.After(bs.strategy.GetBatchDelay())
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
case <-ctx.Done():
......@@ -345,7 +345,7 @@ func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
routing: htc,
sender: adapter,
wantlist: util.NewKeySet(),
blockRequests: make(chan util.Key, 32),
batchRequests: make(chan []util.Key, 32),
......@@ -252,6 +252,7 @@ func (n *dagService) Remove(nd *Node) error {
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
var wg sync.WaitGroup
done := make(chan struct{})
......@@ -284,3 +285,22 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
return done
// Take advantage of blockservice/bitswap batched requests to fetch all
// child nodes of a given node
// TODO: finish this
func (ds *dagService) BatchFetch(ctx context.Context, root *Node) error {
var keys []u.Key
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
blocks, err := ds.Blocks.GetBlocks(keys)
if err != nil {
return err
_ = blocks
//what do i do with blocks?
return nil
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment