Commit e6a504fd authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

rewrite sendWantlistToProviders

parent 061f0d39
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog" eventlog "github.com/jbenet/go-ipfs/util/eventlog"
pset "github.com/jbenet/go-ipfs/util/peerset"
) )
var log = eventlog.Logger("bitswap") var log = eventlog.Logger("bitswap")
...@@ -204,57 +205,34 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e ...@@ -204,57 +205,34 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
} }
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) { func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) {
provset := make(map[u.Key]peer.Peer)
provcollect := make(chan peer.Peer)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
wg := sync.WaitGroup{} message := bsmsg.New()
message.SetFull(true)
for _, e := range bs.wantlist.Entries() {
message.AddEntry(e.Value, e.Priority, false)
}
ps := pset.NewPeerSet()
// Get providers for all entries in wantlist (could take a while) // Get providers for all entries in wantlist (could take a while)
wg := sync.WaitGroup{}
for _, e := range wantlist.Entries() { for _, e := range wantlist.Entries() {
wg.Add(1) wg.Add(1)
go func(k u.Key) { go func(k u.Key) {
defer wg.Done()
child, _ := context.WithTimeout(ctx, providerRequestTimeout) child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest) providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers { for prov := range providers {
provcollect <- prov if ps.AddIfSmallerThan(prov, -1) { //Do once per peer
bs.send(ctx, prov, message)
}
} }
wg.Done()
}(e.Value) }(e.Value)
} }
wg.Wait()
// When all workers finish, close the providers channel
go func() {
wg.Wait()
close(provcollect)
}()
// Filter out duplicates,
// no need to send our wantlists out twice in a given time period
for {
select {
case p, ok := <-provcollect:
if !ok {
break
}
provset[p.Key()] = p
case <-ctx.Done():
log.Error("Context cancelled before we got all the providers!")
return
}
}
message := bsmsg.New()
message.SetFull(true)
for _, e := range bs.wantlist.Entries() {
message.AddEntry(e.Value, e.Priority, false)
}
for _, prov := range provset {
bs.send(ctx, prov, message)
}
} }
func (bs *bitswap) roundWorker(ctx context.Context) { func (bs *bitswap) roundWorker(ctx context.Context) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment