Commit 37412e93 authored by Brian Tiger Chow's avatar Brian Tiger Chow

refactor(bitswap) move workers to bottom of file

parent 8ea3a4b9
...@@ -262,73 +262,6 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli ...@@ -262,73 +262,6 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
} }
} }
func (bs *bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for {
select {
case <-ctx.Done():
return
case nextEnvelope := <-bs.engine.Outbox():
select {
case <-ctx.Done():
return
case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
}
}
// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
ctx, cancel := context.WithCancel(parent)
broadcastSignal := time.After(rebroadcastDelay.Get())
defer cancel()
for {
select {
case <-time.Tick(10 * time.Second):
n := bs.wantlist.Len()
if n > 0 {
log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
}
case <-broadcastSignal: // resend unfulfilled wantlist keys
entries := bs.wantlist.Entries()
if len(entries) > 0 {
bs.sendWantlistToProviders(ctx, entries)
}
broadcastSignal = time.After(rebroadcastDelay.Get())
case keys := <-bs.batchRequests:
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
}
}
// TODO(brian): handle errors // TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) ( func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) { peer.ID, bsmsg.BitSwapMessage) {
...@@ -419,3 +352,70 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) ...@@ -419,3 +352,70 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage)
func (bs *bitswap) Close() error { func (bs *bitswap) Close() error {
return bs.process.Close() return bs.process.Close()
} }
func (bs *bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for {
select {
case <-ctx.Done():
return
case nextEnvelope := <-bs.engine.Outbox():
select {
case <-ctx.Done():
return
case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
}
}
// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
ctx, cancel := context.WithCancel(parent)
broadcastSignal := time.After(rebroadcastDelay.Get())
defer cancel()
for {
select {
case <-time.Tick(10 * time.Second):
n := bs.wantlist.Len()
if n > 0 {
log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
}
case <-broadcastSignal: // resend unfulfilled wantlist keys
entries := bs.wantlist.Entries()
if len(entries) > 0 {
bs.sendWantlistToProviders(ctx, entries)
}
broadcastSignal = time.After(rebroadcastDelay.Get())
case keys := <-bs.batchRequests:
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment