Commit 6a6dc56a authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

bitswap: send wantlist code reuse + debug logs

parent 83d12200
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
package bitswap package bitswap
import ( import (
"fmt"
"math" "math"
"sync" "sync"
"time" "time"
...@@ -170,58 +171,96 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { ...@@ -170,58 +171,96 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return bs.network.Provide(ctx, blk.Key()) return bs.network.Provide(ctx, blk.Key())
} }
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.ID) error { func (bs *bitswap) sendWantlistMsgToPeer(ctx context.Context, m bsmsg.BitSwapMessage, p peer.ID) error {
logd := fmt.Sprintf("%s bitswap.sendWantlistMsgToPeer(%d, %s)", bs.self, len(m.Wantlist()), p)
log.Debugf("%s sending wantlist", logd)
if err := bs.send(ctx, p, m); err != nil {
log.Errorf("%s send wantlist error: %s", logd, err)
return err
}
log.Debugf("%s send wantlist success", logd)
return nil
}
func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
if peers == nil { if peers == nil {
panic("Cant send wantlist to nil peerchan") panic("Cant send wantlist to nil peerchan")
} }
message := bsmsg.New()
for _, wanted := range bs.wantlist.Entries() { logd := fmt.Sprintf("%s bitswap.sendWantlistMsgTo(%d)", bs.self, len(m.Wantlist()))
message.AddEntry(wanted.Key, wanted.Priority) log.Debugf("%s begin", logd)
} defer log.Debugf("%s end", logd)
set := pset.New()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for peerToQuery := range peers { for peerToQuery := range peers {
log.Event(ctx, "PeerToQuery", peerToQuery) log.Event(ctx, "PeerToQuery", peerToQuery)
logd := fmt.Sprintf("%sto(%s)", logd, peerToQuery)
if !set.TryAdd(peerToQuery) { //Do once per peer
log.Debugf("%s skipped (already sent)", logd)
continue
}
wg.Add(1) wg.Add(1)
go func(p peer.ID) { go func(p peer.ID) {
defer wg.Done() defer wg.Done()
if err := bs.send(ctx, p, message); err != nil { bs.sendWantlistMsgToPeer(ctx, m, p)
log.Error(err)
return
}
}(peerToQuery) }(peerToQuery)
} }
wg.Wait() wg.Wait()
return nil return nil
} }
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantlist.ThreadSafe) { func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
message := bsmsg.New() message := bsmsg.New()
message.SetFull(true) message.SetFull(true)
for _, e := range bs.wantlist.Entries() { for _, wanted := range bs.wantlist.Entries() {
message.AddEntry(e.Key, e.Priority) message.AddEntry(wanted.Key, wanted.Priority)
} }
return bs.sendWantlistMsgToPeers(ctx, message, peers)
}
set := pset.New() func (bs *bitswap) sendWantlistToProviders(ctx context.Context) {
logd := fmt.Sprintf("%s bitswap.sendWantlistToProviders", bs.self)
log.Debugf("%s begin", logd)
defer log.Debugf("%s end", logd)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// prepare a channel to hand off to sendWantlistToPeers
sendToPeers := make(chan peer.ID)
// Get providers for all entries in wantlist (could take a while) // Get providers for all entries in wantlist (could take a while)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, e := range wantlist.Entries() { for _, e := range bs.wantlist.Entries() {
wg.Add(1) wg.Add(1)
go func(k u.Key) { go func(k u.Key) {
defer wg.Done() defer wg.Done()
logd := fmt.Sprintf("%s(entry: %s)", logd, k)
log.Debugf("%s asking dht for providers", logd)
child, _ := context.WithTimeout(ctx, providerRequestTimeout) child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers { for prov := range providers {
if set.TryAdd(prov) { //Do once per peer log.Debugf("%s dht returned provider %s. send wantlist", logd, prov)
bs.send(ctx, prov, message) sendToPeers <- prov
}
} }
}(e.Key) }(e.Key)
} }
wg.Wait()
go func() {
wg.Wait() // make sure all our children do finish.
close(sendToPeers)
}()
err := bs.sendWantlistToPeers(ctx, sendToPeers)
if err != nil {
log.Errorf("%s sendWantlistToPeers error: %s", logd, err)
}
} }
func (bs *bitswap) taskWorker(ctx context.Context) { func (bs *bitswap) taskWorker(ctx context.Context) {
...@@ -247,7 +286,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { ...@@ -247,7 +286,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
select { select {
case <-broadcastSignal: case <-broadcastSignal:
// Resend unfulfilled wantlist keys // Resend unfulfilled wantlist keys
bs.sendWantlistToProviders(ctx, bs.wantlist) bs.sendWantlistToProviders(ctx)
broadcastSignal = time.After(rebroadcastDelay.Get()) broadcastSignal = time.After(rebroadcastDelay.Get())
case ks := <-bs.batchRequests: case ks := <-bs.batchRequests:
if len(ks) == 0 { if len(ks) == 0 {
...@@ -266,7 +305,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { ...@@ -266,7 +305,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
// newer bitswap strategies. // newer bitswap strategies.
child, _ := context.WithTimeout(ctx, providerRequestTimeout) child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, ks[0], maxProvidersPerRequest) providers := bs.network.FindProvidersAsync(child, ks[0], maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers) err := bs.sendWantlistToPeers(ctx, providers)
if err != nil { if err != nil {
log.Errorf("error sending wantlist: %s", err) log.Errorf("error sending wantlist: %s", err)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment