diff --git a/v2/blockstore/doc.go b/v2/blockstore/doc.go index de95d63bfcfdfe1aeba70dd0ff855cbfed5d7c3e..a82723419cca327a7d18269a912ca15b47912f5d 100644 --- a/v2/blockstore/doc.go +++ b/v2/blockstore/doc.go @@ -11,7 +11,7 @@ // The ReadWrite blockstore allows writing and reading of the blocks concurrently. The user of this // blockstore is responsible for calling ReadWrite.Finalize when finished writing blocks. // Upon finalization, the instance can no longer be used for reading or writing blocks and will -// panic if used. To continue reading the blocks users are encouraged to use ReadOnly blockstore +// error if used. To continue reading the blocks users are encouraged to use ReadOnly blockstore // instantiated from the same file path using OpenReadOnly. // A user may resume reading/writing from files produced by an instance of ReadWrite blockstore. The // resumption is attempted automatically, if the path passed to OpenReadWrite exists. diff --git a/v2/blockstore/export_test.go b/v2/blockstore/export_test.go deleted file mode 100644 index d4998eec8cfaa3677e17379c6b5e59f5511a431f..0000000000000000000000000000000000000000 --- a/v2/blockstore/export_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package blockstore - -// CloseReadWrite allows our external tests to close a read-write blockstore -// without finalizing it. -// The public API doesn't expose such a method. -// In the future, we might consider adding NewReadWrite taking io interfaces, -// meaning that the caller could be fully in control of opening and closing files. -// Another option would be to expose a "Discard" method alongside "Finalize". -func CloseReadWrite(b *ReadWrite) error { - return b.ronly.Close() -} diff --git a/v2/blockstore/readonly.go b/v2/blockstore/readonly.go index 2df9a7946b065b89322926fa970bb87e2f8ca3e1..7e165bd24530f4fbd39b5034b29c22db04937096 100644 --- a/v2/blockstore/readonly.go +++ b/v2/blockstore/readonly.go @@ -27,32 +27,36 @@ var _ blockstore.Blockstore = (*ReadOnly)(nil) var ( errZeroLengthSection = fmt.Errorf("zero-length carv2 section not allowed by default; see WithZeroLengthSectionAsEOF option") errReadOnly = fmt.Errorf("called write method on a read-only carv2 blockstore") + errClosed = fmt.Errorf("cannot use a carv2 blockstore after closing") ) -type ( - // ReadOnly provides a read-only CAR Block Store. - ReadOnly struct { - // mu allows ReadWrite to be safe for concurrent use. - // It's in ReadOnly so that read operations also grab read locks, - // given that ReadWrite embeds ReadOnly for methods like Get and Has. - // - // The main fields guarded by the mutex are the index and the underlying writers. - // For simplicity, the entirety of the blockstore methods grab the mutex. - mu sync.RWMutex - - // The backing containing the data payload in CARv1 format. - backing io.ReaderAt - // The CARv1 content index. - idx index.Index - - // If we called carv2.NewReaderMmap, remember to close it too. - carv2Closer io.Closer - - ropts carv2.ReadOptions - } +// ReadOnly provides a read-only CAR Block Store. +type ReadOnly struct { + // mu allows ReadWrite to be safe for concurrent use. + // It's in ReadOnly so that read operations also grab read locks, + // given that ReadWrite embeds ReadOnly for methods like Get and Has. + // + // The main fields guarded by the mutex are the index and the underlying writers. + // For simplicity, the entirety of the blockstore methods grab the mutex. + mu sync.RWMutex + + // When true, the blockstore has been closed via Close, Discard, or + // Finalize, and must not be used. Any further blockstore method calls + // will return errClosed to avoid panics or broken behavior. + closed bool + + // The backing containing the data payload in CARv1 format. + backing io.ReaderAt + // The CARv1 content index. + idx index.Index + + // If we called carv2.NewReaderMmap, remember to close it too. + carv2Closer io.Closer + + ropts carv2.ReadOptions +} - contextKey string -) +type contextKey string const asyncErrHandlerKey contextKey = "asyncErrorHandlerKey" @@ -177,7 +181,7 @@ func (b *ReadOnly) readBlock(idx int64) (cid.Cid, []byte, error) { return bcid, data, err } -// DeleteBlock is unsupported and always panics. +// DeleteBlock is unsupported and always errors. func (b *ReadOnly) DeleteBlock(_ cid.Cid) error { return errReadOnly } @@ -187,6 +191,10 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) { b.mu.RLock() defer b.mu.RUnlock() + if b.closed { + return false, errClosed + } + var fnFound bool var fnErr error err := b.idx.GetAll(key, func(offset uint64) bool { @@ -223,6 +231,10 @@ func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) { b.mu.RLock() defer b.mu.RUnlock() + if b.closed { + return nil, errClosed + } + var fnData []byte var fnErr error err := b.idx.GetAll(key, func(offset uint64) bool { @@ -263,6 +275,10 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) { b.mu.RLock() defer b.mu.RUnlock() + if b.closed { + return 0, errClosed + } + fnSize := -1 var fnErr error err := b.idx.GetAll(key, func(offset uint64) bool { @@ -329,16 +345,26 @@ func WithAsyncErrorHandler(ctx context.Context, errHandler func(error)) context. // See WithAsyncErrorHandler func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { // We release the lock when the channel-sending goroutine stops. + // Note that we can't use a deferred unlock here, + // because if we return a nil error, + // we only want to unlock once the async goroutine has stopped. b.mu.RLock() + if b.closed { + b.mu.RUnlock() // don't hold the mutex forever + return nil, errClosed + } + // TODO we may use this walk for populating the index, and we need to be able to iterate keys in this way somewhere for index generation. In general though, when it's asked for all keys from a blockstore with an index, we should iterate through the index when possible rather than linear reads through the full car. rdr := internalio.NewOffsetReadSeeker(b.backing, 0) header, err := carv1.ReadHeader(rdr) if err != nil { + b.mu.RUnlock() // don't hold the mutex forever return nil, fmt.Errorf("error reading car header: %w", err) } headerSize, err := carv1.HeaderSize(header) if err != nil { + b.mu.RUnlock() // don't hold the mutex forever return nil, err } @@ -347,6 +373,7 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { // Seek to the end of header. if _, err = rdr.Seek(int64(headerSize), io.SeekStart); err != nil { + b.mu.RUnlock() // don't hold the mutex forever return nil, err } @@ -424,10 +451,10 @@ func (b *ReadOnly) Roots() ([]cid.Cid, error) { } // Close closes the underlying reader if it was opened by OpenReadOnly. +// After this call, the blockstore can no longer be used. // // Note that this call may block if any blockstore operations are currently in -// progress, including an AllKeysChan that hasn't been fully consumed or -// cancelled. +// progress, including an AllKeysChan that hasn't been fully consumed or cancelled. func (b *ReadOnly) Close() error { b.mu.Lock() defer b.mu.Unlock() @@ -436,6 +463,7 @@ func (b *ReadOnly) Close() error { } func (b *ReadOnly) closeWithoutMutex() error { + b.closed = true if b.carv2Closer != nil { return b.carv2Closer.Close() } diff --git a/v2/blockstore/readonly_test.go b/v2/blockstore/readonly_test.go index 3fd16c8c649273ba13044f55f2dba116ac865d16..7b87aee2a900f5bc330995930339a2937b985f12 100644 --- a/v2/blockstore/readonly_test.go +++ b/v2/blockstore/readonly_test.go @@ -101,7 +101,7 @@ func TestReadOnly(t *testing.T) { require.NoError(t, err) require.Equal(t, wantBlock, gotBlock) - // Assert write operations panic + // Assert write operations error require.Error(t, subject.Put(wantBlock)) require.Error(t, subject.PutMany([]blocks.Block{wantBlock})) require.Error(t, subject.DeleteBlock(key)) @@ -235,3 +235,38 @@ func newV1Reader(r io.Reader, zeroLenSectionAsEOF bool) (*carv1.CarReader, error } return carv1.NewCarReader(r) } + +func TestReadOnlyErrorAfterClose(t *testing.T) { + bs, err := OpenReadOnly("../testdata/sample-v1.car") + require.NoError(t, err) + + roots, err := bs.Roots() + require.NoError(t, err) + _, err = bs.Has(roots[0]) + require.NoError(t, err) + _, err = bs.Get(roots[0]) + require.NoError(t, err) + _, err = bs.GetSize(roots[0]) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + _, err = bs.AllKeysChan(ctx) + require.NoError(t, err) + cancel() // to stop the AllKeysChan goroutine + + bs.Close() + + _, err = bs.Roots() + require.Error(t, err) + _, err = bs.Has(roots[0]) + require.Error(t, err) + _, err = bs.Get(roots[0]) + require.Error(t, err) + _, err = bs.GetSize(roots[0]) + require.Error(t, err) + _, err = bs.AllKeysChan(ctx) + require.Error(t, err) + + // TODO: test that closing blocks if an AllKeysChan operation is + // in progress. +} diff --git a/v2/blockstore/readwrite.go b/v2/blockstore/readwrite.go index 8986ff3055934f527e31a5fea2426ec323106c3f..ae482a7f9a6cfac00ca684ab19cb70d71da255bc 100644 --- a/v2/blockstore/readwrite.go +++ b/v2/blockstore/readwrite.go @@ -23,8 +23,6 @@ import ( var _ blockstore.Blockstore = (*ReadWrite)(nil) -var errFinalized = fmt.Errorf("cannot use a read-write carv2 blockstore after finalizing") - // ReadWrite implements a blockstore that stores blocks in CARv2 format. // Blocks put into the blockstore can be read back once they are successfully written. // This implementation is preferable for a write-heavy workload. @@ -33,7 +31,7 @@ var errFinalized = fmt.Errorf("cannot use a read-write carv2 blockstore after fi // // The Finalize function must be called once the putting blocks are finished. // Upon calling Finalize header is finalized and index is written out. -// Once finalized, all read and write calls to this blockstore will result in panics. +// Once finalized, all read and write calls to this blockstore will result in errors. type ReadWrite struct { ronly ReadOnly @@ -62,7 +60,7 @@ func AllowDuplicatePuts(allow bool) carv2.WriteOption { // ReadWrite.Finalize must be called once putting and reading blocks are no longer needed. // Upon calling ReadWrite.Finalize the CARv2 header and index are written out onto the file and the // backing file is closed. Once finalized, all read and write calls to this blockstore will result -// in panics. Note, ReadWrite.Finalize must be called on an open instance regardless of whether any +// in errors. Note, ReadWrite.Finalize must be called on an open instance regardless of whether any // blocks were put or not. // // If a file at given path does not exist, the instantiation will write car.Pragma and data payload @@ -287,26 +285,22 @@ func (b *ReadWrite) unfinalize() error { return err } -func (b *ReadWrite) finalized() bool { - return b.header.DataSize != 0 -} - // Put puts a given block to the underlying datastore func (b *ReadWrite) Put(blk blocks.Block) error { - // PutMany already checks b.finalized. + // PutMany already checks b.ronly.closed. return b.PutMany([]blocks.Block{blk}) } // PutMany puts a slice of blocks at the same time using batching // capabilities of the underlying datastore whenever possible. func (b *ReadWrite) PutMany(blks []blocks.Block) error { - if b.finalized() { - return errFinalized - } - b.ronly.mu.Lock() defer b.ronly.mu.Unlock() + if b.ronly.closed { + return errClosed + } + for _, bl := range blks { c := bl.Cid() @@ -331,25 +325,37 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error { return nil } +// Discard closes this blockstore without finalizing its header and index. +// After this call, the blockstore can no longer be used. +// +// Note that this call may block if any blockstore operations are currently in +// progress, including an AllKeysChan that hasn't been fully consumed or cancelled. +func (b *ReadWrite) Discard() { + // Same semantics as ReadOnly.Close, including allowing duplicate calls. + // The only difference is that our method is called Discard, + // to further clarify that we're not properly finalizing and writing a + // CARv2 file. + b.ronly.Close() +} + // Finalize finalizes this blockstore by writing the CARv2 header, along with flattened index // for more efficient subsequent read. -// After this call, this blockstore can no longer be used for read or write. +// After this call, the blockstore can no longer be used. func (b *ReadWrite) Finalize() error { - if b.header.DataSize != 0 { + b.ronly.mu.Lock() + defer b.ronly.mu.Unlock() + + if b.ronly.closed { // Allow duplicate Finalize calls, just like Close. // Still error, just like ReadOnly.Close; it should be discarded. - return fmt.Errorf("called Finalize twice") + return fmt.Errorf("called Finalize on a closed blockstore") } - b.ronly.mu.Lock() - defer b.ronly.mu.Unlock() // TODO check if add index option is set and don't write the index then set index offset to zero. b.header = b.header.WithDataSize(uint64(b.dataWriter.Position())) // Note that we can't use b.Close here, as that tries to grab the same // mutex we're holding here. - // TODO: should we check the error here? especially with OpenReadWrite, - // we should care about close errors. defer b.ronly.closeWithoutMutex() // TODO if index not needed don't bother flattening it. @@ -360,39 +366,29 @@ func (b *ReadWrite) Finalize() error { if err := index.WriteTo(fi, internalio.NewOffsetWriter(b.f, int64(b.header.IndexOffset))); err != nil { return err } - _, err = b.header.WriteTo(internalio.NewOffsetWriter(b.f, carv2.PragmaSize)) - return err -} + if _, err := b.header.WriteTo(internalio.NewOffsetWriter(b.f, carv2.PragmaSize)); err != nil { + return err + } -func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - if b.finalized() { - return nil, errFinalized + if err := b.ronly.closeWithoutMutex(); err != nil { + return err } + return nil +} +func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return b.ronly.AllKeysChan(ctx) } func (b *ReadWrite) Has(key cid.Cid) (bool, error) { - if b.finalized() { - return false, errFinalized - } - return b.ronly.Has(key) } func (b *ReadWrite) Get(key cid.Cid) (blocks.Block, error) { - if b.finalized() { - return nil, errFinalized - } - return b.ronly.Get(key) } func (b *ReadWrite) GetSize(key cid.Cid) (int, error) { - if b.finalized() { - return 0, errFinalized - } - return b.ronly.GetSize(key) } diff --git a/v2/blockstore/readwrite_test.go b/v2/blockstore/readwrite_test.go index 5316f5bf7e997e153f87182ce3ce2d02a8f8a4ea..a30bbb235dacc560cf1d9dc6f7dab531887c79d7 100644 --- a/v2/blockstore/readwrite_test.go +++ b/v2/blockstore/readwrite_test.go @@ -373,7 +373,7 @@ func TestBlockstoreResumption(t *testing.T) { // Close off the open file and re-instantiate a new subject with resumption enabled. // Note, we don't have to close the file for resumption to work. // We do this to avoid resource leak during testing. - require.NoError(t, blockstore.CloseReadWrite(subject)) + subject.Discard() } subject, err = blockstore.OpenReadWrite(path, r.Header.Roots, blockstore.UseWholeCIDs(true)) @@ -405,7 +405,7 @@ func TestBlockstoreResumption(t *testing.T) { require.Equal(t, wantBlockCountSoFar, gotBlockCountSoFar) } } - require.NoError(t, blockstore.CloseReadWrite(subject)) + subject.Discard() // Finalize the blockstore to complete partially written CARv2 file. subject, err = blockstore.OpenReadWrite(path, r.Header.Roots, @@ -619,3 +619,51 @@ func TestReadWriteResumptionFromFileWithDifferentCarV1PaddingIsError(t *testing. "Expected padding value of 1413 but got 1314") require.Nil(t, resumingSubject) } + +func TestReadWriteErrorAfterClose(t *testing.T) { + root := blocks.NewBlock([]byte("foo")) + for _, closeMethod := range []func(*blockstore.ReadWrite){ + (*blockstore.ReadWrite).Discard, + func(bs *blockstore.ReadWrite) { bs.Finalize() }, + } { + path := filepath.Join(t.TempDir(), "readwrite.car") + bs, err := blockstore.OpenReadWrite(path, []cid.Cid{root.Cid()}) + require.NoError(t, err) + + err = bs.Put(root) + require.NoError(t, err) + + roots, err := bs.Roots() + require.NoError(t, err) + _, err = bs.Has(roots[0]) + require.NoError(t, err) + _, err = bs.Get(roots[0]) + require.NoError(t, err) + _, err = bs.GetSize(roots[0]) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + _, err = bs.AllKeysChan(ctx) + require.NoError(t, err) + cancel() // to stop the AllKeysChan goroutine + + closeMethod(bs) + + _, err = bs.Roots() + require.Error(t, err) + _, err = bs.Has(roots[0]) + require.Error(t, err) + _, err = bs.Get(roots[0]) + require.Error(t, err) + _, err = bs.GetSize(roots[0]) + require.Error(t, err) + _, err = bs.AllKeysChan(ctx) + require.Error(t, err) + + err = bs.Put(root) + require.Error(t, err) + + // TODO: test that closing blocks if an AllKeysChan operation is + // in progress. + } +}