Unverified Commit dcfe40e4 authored by dirkmc's avatar dirkmc Committed by GitHub

Merge pull request #240 from ipfs/fix/abort-ctx

fix: abort when the context is canceled while getting blocks
parents 84f8ab6c 0bc3d5a4
......@@ -2,6 +2,7 @@ package decision
import (
"context"
"fmt"
"sync"
blocks "github.com/ipfs/go-block-format"
......@@ -50,25 +51,29 @@ func (bsm *blockstoreManager) worker() {
}
}
func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) {
func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-bsm.px.Closing():
return fmt.Errorf("shutting down")
case bsm.jobs <- job:
return nil
}
}
func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) map[cid.Cid]int {
func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) (map[cid.Cid]int, error) {
res := make(map[cid.Cid]int)
if len(ks) == 0 {
return res
return res, nil
}
var lk sync.Mutex
bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
size, err := bsm.bs.GetSize(c)
if err != nil {
if err != bstore.ErrNotFound {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.GetSize(%s) error: %s", c, err)
}
} else {
......@@ -77,21 +82,20 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) m
lk.Unlock()
}
})
return res
}
func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[cid.Cid]blocks.Block {
func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) {
res := make(map[cid.Cid]blocks.Block)
if len(ks) == 0 {
return res
return res, nil
}
var lk sync.Mutex
bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
blk, err := bsm.bs.Get(c)
if err != nil {
if err != bstore.ErrNotFound {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.Get(%s) error: %s", c, err)
}
} else {
......@@ -100,19 +104,23 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[c
lk.Unlock()
}
})
return res
}
func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) {
func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) error {
var err error
wg := sync.WaitGroup{}
for _, k := range ks {
c := k
wg.Add(1)
bsm.addJob(ctx, func() {
err = bsm.addJob(ctx, func() {
jobFn(c)
wg.Done()
})
if err != nil {
wg.Done()
break
}
}
wg.Wait()
return err
}
......@@ -3,7 +3,6 @@ package decision
import (
"context"
"crypto/rand"
"errors"
"sync"
"testing"
"time"
......@@ -30,7 +29,10 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
bsm.start(process.WithTeardown(func() error { return nil }))
cids := testutil.GenerateCids(4)
sizes := bsm.getBlockSizes(ctx, cids)
sizes, err := bsm.getBlockSizes(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != 0 {
t.Fatal("Wrong response length")
}
......@@ -41,7 +43,10 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
}
}
blks := bsm.getBlocks(ctx, cids)
blks, err := bsm.getBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(blks) != 0 {
t.Fatal("Wrong response length")
}
......@@ -82,7 +87,10 @@ func TestBlockstoreManager(t *testing.T) {
cids = append(cids, b.Cid())
}
sizes := bsm.getBlockSizes(ctx, cids)
sizes, err := bsm.getBlockSizes(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != len(blks)-1 {
t.Fatal("Wrong response length")
}
......@@ -106,7 +114,10 @@ func TestBlockstoreManager(t *testing.T) {
}
}
fetched := bsm.getBlocks(ctx, cids)
fetched, err := bsm.getBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(fetched) != len(blks)-1 {
t.Fatal("Wrong response length")
}
......@@ -160,17 +171,16 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
go func(t *testing.T) {
defer wg.Done()
sizes := bsm.getBlockSizes(ctx, ks)
sizes, err := bsm.getBlockSizes(ctx, ks)
if err != nil {
t.Error(err)
}
if len(sizes) != len(blks) {
err = errors.New("Wrong response length")
t.Error("Wrong response length")
}
}(t)
}
wg.Wait()
if err != nil {
t.Fatal(err)
}
}
func TestBlockstoreManagerClose(t *testing.T) {
......@@ -184,7 +194,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
px := process.WithTeardown(func() error { return nil })
bsm.start(px)
blks := testutil.GenerateBlocksOfSize(3, 1024)
blks := testutil.GenerateBlocksOfSize(10, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
......@@ -199,34 +209,29 @@ func TestBlockstoreManagerClose(t *testing.T) {
time.Sleep(5 * time.Millisecond)
fnCallDone := make(chan struct{})
go func() {
bsm.getBlockSizes(ctx, ks)
fnCallDone <- struct{}{}
}()
select {
case <-fnCallDone:
t.Fatal("call to BlockstoreManager should be cancelled")
case <-px.Closed():
before := time.Now()
_, err = bsm.getBlockSizes(ctx, ks)
if err == nil {
t.Error("expected an error")
}
// would expect to wait delayTime*10 if we didn't cancel.
if time.Since(before) > delayTime*2 {
t.Error("expected a fast timeout")
}
}
func TestBlockstoreManagerCtxDone(t *testing.T) {
delayTime := 20 * time.Millisecond
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), delayTime/2)
defer cancel()
bsdelay := delay.Fixed(delayTime)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(ctx, bstore, 3)
bsm := newBlockstoreManager(context.Background(), bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)
blks := testutil.GenerateBlocksOfSize(3, 1024)
blks := testutil.GenerateBlocksOfSize(10, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
......@@ -237,15 +242,17 @@ func TestBlockstoreManagerCtxDone(t *testing.T) {
t.Fatal(err)
}
fnCallDone := make(chan struct{})
go func() {
bsm.getBlockSizes(ctx, ks)
fnCallDone <- struct{}{}
}()
ctx, cancel := context.WithTimeout(context.Background(), delayTime/2)
defer cancel()
before := time.Now()
_, err = bsm.getBlockSizes(ctx, ks)
if err == nil {
t.Error("expected an error")
}
select {
case <-fnCallDone:
t.Fatal("call to BlockstoreManager should be cancelled")
case <-ctx.Done():
// would expect to wait delayTime*10 if we didn't cancel.
if time.Since(before) > delayTime*2 {
t.Error("expected a fast timeout")
}
}
......@@ -367,7 +367,11 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for _, t := range nextTask.Tasks {
blockCids.Add(t.Identifier.(cid.Cid))
}
blks := e.bsm.getBlocks(ctx, blockCids.Keys())
blks, err := e.bsm.getBlocks(ctx, blockCids.Keys())
if err != nil {
// we're dropping the envelope but that's not an issue in practice.
return nil, err
}
msg := bsmsg.New(true)
for _, b := range blks {
......@@ -437,7 +441,11 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
wantKs.Add(entry.Cid)
}
}
blockSizes := e.bsm.getBlockSizes(ctx, wantKs.Keys())
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
if err != nil {
log.Info("aborting message processing", err)
return
}
l := e.findOrCreate(p)
l.lk.Lock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment