Commit de2711c0 authored by Masih H. Derkani's avatar Masih H. Derkani Committed by Masih H. Derkani

Propagate async `blockstore.AllKeysChan` errors via context

Implement a mechanism that allows the user to hook an error handling
function to the context passed to `AllKeysChan`. The function is then
notified when an error occurs during asynchronous traversal of data

Add tests that assert the success and failure cases when error handler
is set.

Fixes #177
parent 703b88c2
......@@ -24,26 +24,34 @@ import (
var _ blockstore.Blockstore = (*ReadOnly)(nil)
// ReadOnly provides a read-only CAR Block Store.
type ReadOnly struct {
// mu allows ReadWrite to be safe for concurrent use.
// It's in ReadOnly so that read operations also grab read locks,
// given that ReadWrite embeds ReadOnly for methods like Get and Has.
// The main fields guarded by the mutex are the index and the underlying writers.
// For simplicity, the entirety of the blockstore methods grab the mutex.
mu sync.RWMutex
// The backing containing the data payload in CARv1 format.
backing io.ReaderAt
// The CARv1 content index.
idx index.Index
// If we called carv2.NewReaderMmap, remember to close it too.
carv2Closer io.Closer
ropts carv2.ReadOptions
var errZeroLengthSection = fmt.Errorf("zero-length section not allowed by default; see WithZeroLengthSectionAsEOF option")
type (
// ReadOnly provides a read-only CAR Block Store.
ReadOnly struct {
// mu allows ReadWrite to be safe for concurrent use.
// It's in ReadOnly so that read operations also grab read locks,
// given that ReadWrite embeds ReadOnly for methods like Get and Has.
// The main fields guarded by the mutex are the index and the underlying writers.
// For simplicity, the entirety of the blockstore methods grab the mutex.
mu sync.RWMutex
// The backing containing the data payload in CARv1 format.
backing io.ReaderAt
// The CARv1 content index.
idx index.Index
// If we called carv2.NewReaderMmap, remember to close it too.
carv2Closer io.Closer
ropts carv2.ReadOptions
contextKey string
const asyncErrHandlerKey contextKey = "asyncErrorHandlerKey"
// UseWholeCIDs is a read option which makes a CAR blockstore identify blocks by
// whole CIDs, and not just their multihashes. The default is to use
......@@ -303,7 +311,19 @@ func (b *ReadOnly) PutMany([]blocks.Block) error {
panic("called write method on a read-only blockstore")
// AllKeysChan returns the list of keys in the CAR.
// WithAsyncErrorHandler returns a context with async error handling set to the given errHandler.
// Any errors that occur during asynchronous operations of AllKeysChan will be passed to the given
// handler.
func WithAsyncErrorHandler(ctx context.Context, errHandler func(error)) context.Context {
return context.WithValue(ctx, asyncErrHandlerKey, errHandler)
// AllKeysChan returns the list of keys in the CAR data payload.
// If the ctx is constructed using WithAsyncErrorHandler any errors that occur during asynchronous
// retrieval of CIDs will be passed to the error handler function set in context.
// Otherwise, errors will terminate the asynchronous operation silently.
// See WithAsyncErrorHandler
func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// We release the lock when the channel-sending goroutine stops.
......@@ -334,7 +354,10 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
for {
length, err := varint.ReadUvarint(rdr)
if err != nil {
return // TODO: log this error
if err != io.EOF {
maybeReportError(ctx, err)
// Null padding; by default it's an error.
......@@ -342,18 +365,20 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if b.ropts.ZeroLengthSectionAsEOF {
} else {
return // TODO: log this error
// return fmt.Errorf("carv1 null padding not allowed by default; see WithZeroLegthSectionAsEOF")
maybeReportError(ctx, errZeroLengthSection)
thisItemForNxt := rdr.Offset()
_, c, err := cid.CidFromReader(rdr)
if err != nil {
return // TODO: log this error
maybeReportError(ctx, err)
if _, err := rdr.Seek(thisItemForNxt+int64(length), io.SeekStart); err != nil {
return // TODO: log this error
maybeReportError(ctx, err)
// If we're just using multihashes, flatten to the "raw" codec.
......@@ -364,7 +389,7 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
select {
case ch <- c:
case <-ctx.Done():
// TODO: log ctx error
maybeReportError(ctx, ctx.Err())
......@@ -372,6 +397,15 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return ch, nil
// maybeReportError checks if an error handler is present in context associated to the key
// asyncErrHandlerKey, and if preset it will pass the error to it.
func maybeReportError(ctx context.Context, err error) {
value := ctx.Value(asyncErrHandlerKey)
if eh, _ := value.(func(error)); eh != nil {
// HashOnRead is currently unimplemented; hashing on reads never happens.
func (b *ReadOnly) HashOnRead(bool) {
// TODO: implement before the final release?
......@@ -131,6 +131,84 @@ func TestNewReadOnlyFailsOnUnknownVersion(t *testing.T) {
require.Nil(t, subject)
func TestReadOnlyAllKeysChanErrHandlerCalledOnTimeout(t *testing.T) {
expiredCtx, cancel := context.WithTimeout(context.Background(), -time.Millisecond)
subject, err := OpenReadOnly("../testdata/")
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, subject.Close()) })
// Make a channel to be able to select and block on until error handler is called.
errHandlerCalled := make(chan interface{})
expiredErrHandlingCtx := WithAsyncErrorHandler(expiredCtx, func(err error) {
defer close(errHandlerCalled)
require.EqualError(t, err, "context deadline exceeded")
_, err = subject.AllKeysChan(expiredErrHandlingCtx)
require.NoError(t, err)
// Assert error handler was called with required condition, waiting at most 3 seconds.
select {
case <-errHandlerCalled:
case <-time.After(time.Second * 3):
require.Fail(t, "error handler was not called within expected time window")
func TestReadOnlyAllKeysChanErrHandlerNeverCalled(t *testing.T) {
tests := []struct {
name string
path string
errHandler func(err error)
wantCIDs []cid.Cid
func(err error) {
require.Fail(t, "unexpected call", "error handler called unexpectedly with err: %v", err)
listCids(t, newV1ReaderFromV1File(t, "../testdata/", false)),
func(err error) {
require.Fail(t, "unexpected call", "error handler called unexpectedly with err: %v", err)
listCids(t, newV1ReaderFromV2File(t, "../testdata/", false)),
for _, tt := range tests {
t.Run(, func(t *testing.T) {
subject, err := OpenReadOnly(tt.path, UseWholeCIDs(true))
require.NoError(t, err)
ctx := WithAsyncErrorHandler(context.Background(), tt.errHandler)
keysChan, err := subject.AllKeysChan(ctx)
require.NoError(t, err)
var gotCids []cid.Cid
for k := range keysChan {
gotCids = append(gotCids, k)
require.Equal(t, tt.wantCIDs, gotCids)
func listCids(t *testing.T, v1r *carv1.CarReader) (cids []cid.Cid) {
for {
block, err := v1r.Next()
if err == io.EOF {
require.NoError(t, err)
cids = append(cids, block.Cid())
func newV1ReaderFromV1File(t *testing.T, carv1Path string, zeroLenSectionAsEOF bool) *carv1.CarReader {
f, err := os.Open(carv1Path)
require.NoError(t, err)
