Commit 13f98cb1 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

dont spawn so many goroutines when rebroadcasting wantlist

parent 3818c938
...@@ -201,21 +201,57 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e ...@@ -201,21 +201,57 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
} }
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) { func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) {
provset := make(map[u.Key]peer.Peer)
provcollect := make(chan peer.Peer)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
// Get providers for all entries in wantlist (could take a while)
for _, e := range wantlist.Entries() { for _, e := range wantlist.Entries() {
wg.Add(1) wg.Add(1)
go func(k u.Key) { go func(k u.Key) {
child, _ := context.WithTimeout(ctx, providerRequestTimeout) child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest) providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers) for prov := range providers {
if err != nil { provcollect <- prov
log.Errorf("error sending wantlist: %s", err)
} }
wg.Done() wg.Done()
}(e.Value) }(e.Value)
} }
wg.Wait()
// When all workers finish, close the providers channel
go func() {
wg.Wait()
close(provcollect)
}()
// Filter out duplicates,
// no need to send our wantlists out twice in a given time period
for {
select {
case p, ok := <-provcollect:
if !ok {
break
}
provset[p.Key()] = p
case <-ctx.Done():
log.Error("Context cancelled before we got all the providers!")
return
}
}
message := bsmsg.New()
message.SetFull(true)
for _, e := range bs.wantlist.Entries() {
message.AddEntry(e.Value, e.Priority, false)
}
for _, prov := range provset {
bs.send(ctx, prov, message)
}
} }
func (bs *bitswap) roundWorker(ctx context.Context) { func (bs *bitswap) roundWorker(ctx context.Context) {
...@@ -229,22 +265,25 @@ func (bs *bitswap) roundWorker(ctx context.Context) { ...@@ -229,22 +265,25 @@ func (bs *bitswap) roundWorker(ctx context.Context) {
if err != nil { if err != nil {
log.Critical("%s", err) log.Critical("%s", err)
} }
log.Error(alloc) err = bs.processStrategyAllocation(ctx, alloc)
bs.processStrategyAllocation(ctx, alloc) if err != nil {
log.Critical("Error processing strategy allocation: %s", err)
}
} }
} }
} }
func (bs *bitswap) processStrategyAllocation(ctx context.Context, alloc []*strategy.Task) { func (bs *bitswap) processStrategyAllocation(ctx context.Context, alloc []*strategy.Task) error {
for _, t := range alloc { for _, t := range alloc {
for _, block := range t.Blocks { for _, block := range t.Blocks {
message := bsmsg.New() message := bsmsg.New()
message.AddBlock(block) message.AddBlock(block)
if err := bs.send(ctx, t.Peer, message); err != nil { if err := bs.send(ctx, t.Peer, message); err != nil {
log.Errorf("Message Send Failed: %s", err) return err
} }
} }
} }
return nil
} }
// TODO ensure only one active request per key // TODO ensure only one active request per key
...@@ -252,22 +291,16 @@ func (bs *bitswap) clientWorker(parent context.Context) { ...@@ -252,22 +291,16 @@ func (bs *bitswap) clientWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent) ctx, cancel := context.WithCancel(parent)
broadcastSignal := time.NewTicker(rebroadcastDelay) broadcastSignal := time.After(rebroadcastDelay)
defer func() { defer cancel()
cancel() // signal to derived async functions
broadcastSignal.Stop()
}()
for { for {
select { select {
case <-broadcastSignal.C: case <-broadcastSignal:
// Resend unfulfilled wantlist keys // Resend unfulfilled wantlist keys
bs.sendWantlistToProviders(ctx, bs.wantlist) bs.sendWantlistToProviders(ctx, bs.wantlist)
broadcastSignal = time.After(rebroadcastDelay)
case ks := <-bs.batchRequests: case ks := <-bs.batchRequests:
// TODO: implement batching on len(ks) > X for some X
// i.e. if given 20 keys, fetch first five, then next
// five, and so on, so we are more likely to be able to
// effectively stream the data
if len(ks) == 0 { if len(ks) == 0 {
log.Warning("Received batch request for zero blocks") log.Warning("Received batch request for zero blocks")
continue continue
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment