Commit e5983cbe authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Jeromy

some renaming

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent 6c2a6669
...@@ -155,21 +155,19 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e ...@@ -155,21 +155,19 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
} }
func (bs *bitswap) run(ctx context.Context) { func (bs *bitswap) run(ctx context.Context) {
var sendlist <-chan peer.Peer
// Every so often, we should resend out our current want list const rebroadcastPeriod = time.Second * 5 // Every so often, we should resend out our current want list
const rebroadcastTime = time.Second * 5 const batchDelay = time.Millisecond * 3 // Time to wait before sending out wantlists to better batch up requests
const peersPerSend = 6
// Time to wait before sending out wantlists to better batch up requests
const bufferTime = time.Millisecond * 3
peersPerSend := 6
timeout := time.After(rebroadcastTime)
const threshold = 10 const threshold = 10
var sendlist <-chan peer.Peer // NB: must be initialized to zero value
broadcastSignal := time.After(rebroadcastPeriod)
unsent := 0 unsent := 0
for { for {
select { select {
case <-timeout: case <-broadcastSignal:
wantlist := bs.wantlist.Keys() wantlist := bs.wantlist.Keys()
if len(wantlist) == 0 { if len(wantlist) == 0 {
continue continue
...@@ -184,7 +182,7 @@ func (bs *bitswap) run(ctx context.Context) { ...@@ -184,7 +182,7 @@ func (bs *bitswap) run(ctx context.Context) {
log.Errorf("error sending wantlist: %s", err) log.Errorf("error sending wantlist: %s", err)
} }
sendlist = nil sendlist = nil
timeout = time.After(rebroadcastTime) broadcastSignal = time.After(rebroadcastPeriod)
case k := <-bs.blockRequests: case k := <-bs.blockRequests:
if unsent == 0 { if unsent == 0 {
sendlist = bs.routing.FindProvidersAsync(ctx, k, peersPerSend) sendlist = bs.routing.FindProvidersAsync(ctx, k, peersPerSend)
...@@ -198,12 +196,12 @@ func (bs *bitswap) run(ctx context.Context) { ...@@ -198,12 +196,12 @@ func (bs *bitswap) run(ctx context.Context) {
log.Errorf("error sending wantlist: %s", err) log.Errorf("error sending wantlist: %s", err)
} }
unsent = 0 unsent = 0
timeout = time.After(rebroadcastTime) broadcastSignal = time.After(rebroadcastPeriod)
sendlist = nil sendlist = nil
} else { } else {
// set a timeout to wait for more blocks or send current wantlist // set a timeout to wait for more blocks or send current wantlist
timeout = time.After(bufferTime) broadcastSignal = time.After(batchDelay)
} }
case <-ctx.Done(): case <-ctx.Done():
return return
......
...@@ -294,7 +294,7 @@ func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance { ...@@ -294,7 +294,7 @@ func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
routing: htc, routing: htc,
sender: adapter, sender: adapter,
wantlist: util.NewKeySet(), wantlist: util.NewKeySet(),
blockReq: make(chan util.Key, 32), blockRequests: make(chan util.Key, 32),
} }
adapter.SetDelegate(bs) adapter.SetDelegate(bs)
go bs.run(context.TODO()) go bs.run(context.TODO())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment