Commit 7643ad2d authored by hannahhoward's avatar hannahhoward

refactor(GetBlocks): Merge session/non-session

Make Bitswap GetBlocks just create a temporary session and use that code

fix #52 fix #49
parent bb897895
......@@ -16,7 +16,6 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
bspm "github.com/ipfs/go-bitswap/peermanager"
bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
bssession "github.com/ipfs/go-bitswap/session"
......@@ -95,9 +94,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)
notif := notifications.New()
px := process.WithTeardown(func() error {
notif.Shutdown()
return nil
})
......@@ -120,10 +117,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
bs := &Bitswap{
blockstore: bstore,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
......@@ -179,12 +174,6 @@ type Bitswap struct {
// NB: ensure threadsafety
blockstore blockstore.Blockstore
// notifications engine for receiving new blocks and routing them to the
// appropriate user requests
notifications notifications.PubSub
// findKeys sends keys to a worker to find and connect to providers for them
findKeys chan *blockRequest
// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
......@@ -248,86 +237,8 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
return out, nil
}
select {
case <-bs.process.Closing():
return nil, errors.New("bitswap is closed")
default:
}
promise := bs.notifications.Subscribe(ctx, keys...)
for _, k := range keys {
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
}
mses := bs.sm.GetNextSessionID()
bs.wm.WantBlocks(ctx, keys, nil, mses)
remaining := cid.NewSet()
for _, k := range keys {
remaining.Add(k)
}
out := make(chan blocks.Block)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(out)
defer func() {
// can't just defer this call on its own, arguments are resolved *when* the defer is created
bs.CancelWants(remaining.Keys(), mses)
}()
findProvsDelay := time.NewTimer(findProviderDelay)
defer findProvsDelay.Stop()
findProvsDelayCh := findProvsDelay.C
req := &blockRequest{
Cid: keys[0],
Ctx: ctx,
}
var findProvsReqCh chan<- *blockRequest
for {
select {
case <-findProvsDelayCh:
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
findProvsReqCh = bs.findKeys
findProvsDelayCh = nil
case findProvsReqCh <- req:
findProvsReqCh = nil
case blk, ok := <-promise:
if !ok {
return
}
// No need to find providers now.
findProvsDelay.Stop()
findProvsDelayCh = nil
findProvsReqCh = nil
bs.CancelWants([]cid.Cid{blk.Cid()}, mses)
remaining.Remove(blk.Cid())
select {
case out <- blk:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out, nil
session := bs.sm.NewSession(ctx)
return session.GetBlocks(ctx, keys)
}
// CancelWants removes a given key from the wantlist.
......@@ -366,7 +277,6 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
// is waiting on a GetBlock for that object, they will receive a reference
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isnt causing immediate problems.
bs.notifications.Publish(blk)
bs.sm.ReceiveBlockFrom(from, blk)
......
......@@ -533,8 +533,8 @@ func TestWantlistCleanup(t *testing.T) {
}
time.Sleep(time.Millisecond * 50)
if len(bswap.GetWantlist()) != 11 {
t.Fatal("should have 11 keys in wantlist")
if len(bswap.GetWantlist()) != 5 {
t.Fatal("should have 5 keys in wantlist")
}
cancel()
......
......@@ -2,9 +2,6 @@ package bitswap
import (
"context"
"math/rand"
"sync"
"time"
engine "github.com/ipfs/go-bitswap/decision"
bsmsg "github.com/ipfs/go-bitswap/message"
......@@ -12,16 +9,11 @@ import (
logging "github.com/ipfs/go-log"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
peer "github.com/libp2p/go-libp2p-peer"
)
var TaskWorkerCount = 8
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) {
bs.providerQueryManager(ctx)
})
// Start up workers to handle requests from other nodes for the data on this node
for i := 0; i < TaskWorkerCount; i++ {
......@@ -31,11 +23,6 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
})
}
// Start up a worker to manage periodically resending our wantlist out to peers
px.Go(func(px process.Process) {
bs.rebroadcastWorker(ctx)
})
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
......@@ -188,93 +175,3 @@ func (bs *Bitswap) provideCollector(ctx context.Context) {
}
}
}
func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
defer cancel()
broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
defer broadcastSignal.Stop()
tick := time.NewTicker(10 * time.Second)
defer tick.Stop()
for {
log.Event(ctx, "Bitswap.Rebroadcast.idle")
select {
case <-tick.C:
n := bs.wm.WantCount()
if n > 0 {
log.Debugf("%d keys in bitswap wantlist", n)
}
case <-broadcastSignal.C: // resend unfulfilled wantlist keys
log.Event(ctx, "Bitswap.Rebroadcast.active")
entries := bs.wm.CurrentWants()
if len(entries) == 0 {
continue
}
// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
i := rand.Intn(len(entries))
select {
case bs.findKeys <- &blockRequest{
Cid: entries[i].Cid,
Ctx: ctx,
}:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
func (bs *Bitswap) providerQueryManager(ctx context.Context) {
var activeLk sync.Mutex
kset := cid.NewSet()
for {
select {
case e := <-bs.findKeys:
select { // make sure its not already cancelled
case <-e.Ctx.Done():
continue
default:
}
activeLk.Lock()
if kset.Has(e.Cid) {
activeLk.Unlock()
continue
}
kset.Add(e.Cid)
activeLk.Unlock()
go func(e *blockRequest) {
child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
defer cancel()
providers := bs.network.FindProvidersAsync(child, e.Cid, maxProvidersPerRequest)
wg := &sync.WaitGroup{}
for p := range providers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := bs.network.ConnectTo(child, p)
if err != nil {
log.Debugf("failed to connect to provider %s: %s", p, err)
}
}(p)
}
wg.Wait()
activeLk.Lock()
kset.Remove(e.Cid)
activeLk.Unlock()
}(e)
case <-ctx.Done():
return
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment