diff --git a/v2/blockstore/readonly.go b/v2/blockstore/readonly.go index fcd973456e5e51a0410d3a281d31e35b1647f23d..dbf7bf61b896078f9fa3d62fc839b69d2fed6081 100644 --- a/v2/blockstore/readonly.go +++ b/v2/blockstore/readonly.go @@ -24,26 +24,34 @@ import ( var _ blockstore.Blockstore = (*ReadOnly)(nil) -// ReadOnly provides a read-only CAR Block Store. -type ReadOnly struct { - // mu allows ReadWrite to be safe for concurrent use. - // It's in ReadOnly so that read operations also grab read locks, - // given that ReadWrite embeds ReadOnly for methods like Get and Has. - // - // The main fields guarded by the mutex are the index and the underlying writers. - // For simplicity, the entirety of the blockstore methods grab the mutex. - mu sync.RWMutex - - // The backing containing the data payload in CARv1 format. - backing io.ReaderAt - // The CARv1 content index. - idx index.Index - - // If we called carv2.NewReaderMmap, remember to close it too. - carv2Closer io.Closer - - ropts carv2.ReadOptions -} +var errZeroLengthSection = fmt.Errorf("zero-length section not allowed by default; see WithZeroLengthSectionAsEOF option") + +type ( + // ReadOnly provides a read-only CAR Block Store. + ReadOnly struct { + // mu allows ReadWrite to be safe for concurrent use. + // It's in ReadOnly so that read operations also grab read locks, + // given that ReadWrite embeds ReadOnly for methods like Get and Has. + // + // The main fields guarded by the mutex are the index and the underlying writers. + // For simplicity, the entirety of the blockstore methods grab the mutex. + mu sync.RWMutex + + // The backing containing the data payload in CARv1 format. + backing io.ReaderAt + // The CARv1 content index. + idx index.Index + + // If we called carv2.NewReaderMmap, remember to close it too. + carv2Closer io.Closer + + ropts carv2.ReadOptions + } + + contextKey string +) + +const asyncErrHandlerKey contextKey = "asyncErrorHandlerKey" // UseWholeCIDs is a read option which makes a CAR blockstore identify blocks by // whole CIDs, and not just their multihashes. The default is to use @@ -303,7 +311,19 @@ func (b *ReadOnly) PutMany([]blocks.Block) error { panic("called write method on a read-only blockstore") } -// AllKeysChan returns the list of keys in the CAR. +// WithAsyncErrorHandler returns a context with async error handling set to the given errHandler. +// Any errors that occur during asynchronous operations of AllKeysChan will be passed to the given +// handler. +func WithAsyncErrorHandler(ctx context.Context, errHandler func(error)) context.Context { + return context.WithValue(ctx, asyncErrHandlerKey, errHandler) +} + +// AllKeysChan returns the list of keys in the CAR data payload. +// If the ctx is constructed using WithAsyncErrorHandler any errors that occur during asynchronous +// retrieval of CIDs will be passed to the error handler function set in context. +// Otherwise, errors will terminate the asynchronous operation silently. +// +// See WithAsyncErrorHandler func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { // We release the lock when the channel-sending goroutine stops. b.mu.RLock() @@ -334,7 +354,10 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { for { length, err := varint.ReadUvarint(rdr) if err != nil { - return // TODO: log this error + if err != io.EOF { + maybeReportError(ctx, err) + } + return } // Null padding; by default it's an error. @@ -342,18 +365,20 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { if b.ropts.ZeroLengthSectionAsEOF { break } else { - return // TODO: log this error - // return fmt.Errorf("carv1 null padding not allowed by default; see WithZeroLegthSectionAsEOF") + maybeReportError(ctx, errZeroLengthSection) + return } } thisItemForNxt := rdr.Offset() _, c, err := cid.CidFromReader(rdr) if err != nil { - return // TODO: log this error + maybeReportError(ctx, err) + return } if _, err := rdr.Seek(thisItemForNxt+int64(length), io.SeekStart); err != nil { - return // TODO: log this error + maybeReportError(ctx, err) + return } // If we're just using multihashes, flatten to the "raw" codec. @@ -364,7 +389,7 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { select { case ch <- c: case <-ctx.Done(): - // TODO: log ctx error + maybeReportError(ctx, ctx.Err()) return } } @@ -372,6 +397,15 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return ch, nil } +// maybeReportError checks if an error handler is present in context associated to the key +// asyncErrHandlerKey, and if preset it will pass the error to it. +func maybeReportError(ctx context.Context, err error) { + value := ctx.Value(asyncErrHandlerKey) + if eh, _ := value.(func(error)); eh != nil { + eh(err) + } +} + // HashOnRead is currently unimplemented; hashing on reads never happens. func (b *ReadOnly) HashOnRead(bool) { // TODO: implement before the final release? diff --git a/v2/blockstore/readonly_test.go b/v2/blockstore/readonly_test.go index 470ace2cc68e7bd27f29e7860bf41fcc86ae9330..ecc30e1d1005d71d7b93140572f5a3931d790e8f 100644 --- a/v2/blockstore/readonly_test.go +++ b/v2/blockstore/readonly_test.go @@ -131,6 +131,84 @@ func TestNewReadOnlyFailsOnUnknownVersion(t *testing.T) { require.Nil(t, subject) } +func TestReadOnlyAllKeysChanErrHandlerCalledOnTimeout(t *testing.T) { + expiredCtx, cancel := context.WithTimeout(context.Background(), -time.Millisecond) + t.Cleanup(cancel) + + subject, err := OpenReadOnly("../testdata/sample-v1.car") + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, subject.Close()) }) + + // Make a channel to be able to select and block on until error handler is called. + errHandlerCalled := make(chan interface{}) + expiredErrHandlingCtx := WithAsyncErrorHandler(expiredCtx, func(err error) { + defer close(errHandlerCalled) + require.EqualError(t, err, "context deadline exceeded") + }) + _, err = subject.AllKeysChan(expiredErrHandlingCtx) + require.NoError(t, err) + + // Assert error handler was called with required condition, waiting at most 3 seconds. + select { + case <-errHandlerCalled: + break + case <-time.After(time.Second * 3): + require.Fail(t, "error handler was not called within expected time window") + } +} + +func TestReadOnlyAllKeysChanErrHandlerNeverCalled(t *testing.T) { + tests := []struct { + name string + path string + errHandler func(err error) + wantCIDs []cid.Cid + }{ + { + "ReadingValidCarV1ReturnsNoErrors", + "../testdata/sample-v1.car", + func(err error) { + require.Fail(t, "unexpected call", "error handler called unexpectedly with err: %v", err) + }, + listCids(t, newV1ReaderFromV1File(t, "../testdata/sample-v1.car", false)), + }, + { + "ReadingValidCarV2ReturnsNoErrors", + "../testdata/sample-wrapped-v2.car", + func(err error) { + require.Fail(t, "unexpected call", "error handler called unexpectedly with err: %v", err) + }, + listCids(t, newV1ReaderFromV2File(t, "../testdata/sample-wrapped-v2.car", false)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + subject, err := OpenReadOnly(tt.path, UseWholeCIDs(true)) + require.NoError(t, err) + ctx := WithAsyncErrorHandler(context.Background(), tt.errHandler) + keysChan, err := subject.AllKeysChan(ctx) + require.NoError(t, err) + var gotCids []cid.Cid + for k := range keysChan { + gotCids = append(gotCids, k) + } + require.Equal(t, tt.wantCIDs, gotCids) + }) + } +} + +func listCids(t *testing.T, v1r *carv1.CarReader) (cids []cid.Cid) { + for { + block, err := v1r.Next() + if err == io.EOF { + break + } + require.NoError(t, err) + cids = append(cids, block.Cid()) + } + return +} + func newV1ReaderFromV1File(t *testing.T, carv1Path string, zeroLenSectionAsEOF bool) *carv1.CarReader { f, err := os.Open(carv1Path) require.NoError(t, err)