Commit 039ddc7c authored by Daniel Martí's avatar Daniel Martí Committed by Masih H. Derkani

v2/blockstore: add ReadWrite.Discard

This allows closing a read-write blockstore without doing the extra work
to finalize its header and index. Can be useful if an entire piece of
work is cancelled, and also simplifies the tests.

Also make ReadOnly error in a straightforward way if it is used after
being closed. Before, this could lead to panics, as we'd attempt to read
the CARv1 file when it's closed.

Both mechanisms now use a "closed" boolean, which is consistent and
simpler than checking a header field.

Finally, add tests that ensure both ReadOnly and ReadWrite behave as
intended once they have been closed.

The tests also uncovered that AllKeysChan would not release the mutex if
it encountered an error early on. Fix that, too.

While at it, fix some now-obsolete references to panics on unsupported
or after-close method calls.

Fixes #205.
parent 6c87996f
......@@ -11,7 +11,7 @@
// The ReadWrite blockstore allows writing and reading of the blocks concurrently. The user of this
// blockstore is responsible for calling ReadWrite.Finalize when finished writing blocks.
// Upon finalization, the instance can no longer be used for reading or writing blocks and will
// panic if used. To continue reading the blocks users are encouraged to use ReadOnly blockstore
// error if used. To continue reading the blocks users are encouraged to use ReadOnly blockstore
// instantiated from the same file path using OpenReadOnly.
// A user may resume reading/writing from files produced by an instance of ReadWrite blockstore. The
// resumption is attempted automatically, if the path passed to OpenReadWrite exists.
......
package blockstore
// CloseReadWrite allows our external tests to close a read-write blockstore
// without finalizing it.
// The public API doesn't expose such a method.
// In the future, we might consider adding NewReadWrite taking io interfaces,
// meaning that the caller could be fully in control of opening and closing files.
// Another option would be to expose a "Discard" method alongside "Finalize".
func CloseReadWrite(b *ReadWrite) error {
return b.ronly.Close()
}
......@@ -27,32 +27,36 @@ var _ blockstore.Blockstore = (*ReadOnly)(nil)
var (
errZeroLengthSection = fmt.Errorf("zero-length carv2 section not allowed by default; see WithZeroLengthSectionAsEOF option")
errReadOnly = fmt.Errorf("called write method on a read-only carv2 blockstore")
errClosed = fmt.Errorf("cannot use a carv2 blockstore after closing")
)
type (
// ReadOnly provides a read-only CAR Block Store.
ReadOnly struct {
// mu allows ReadWrite to be safe for concurrent use.
// It's in ReadOnly so that read operations also grab read locks,
// given that ReadWrite embeds ReadOnly for methods like Get and Has.
//
// The main fields guarded by the mutex are the index and the underlying writers.
// For simplicity, the entirety of the blockstore methods grab the mutex.
mu sync.RWMutex
// The backing containing the data payload in CARv1 format.
backing io.ReaderAt
// The CARv1 content index.
idx index.Index
// If we called carv2.NewReaderMmap, remember to close it too.
carv2Closer io.Closer
ropts carv2.ReadOptions
}
// ReadOnly provides a read-only CAR Block Store.
type ReadOnly struct {
// mu allows ReadWrite to be safe for concurrent use.
// It's in ReadOnly so that read operations also grab read locks,
// given that ReadWrite embeds ReadOnly for methods like Get and Has.
//
// The main fields guarded by the mutex are the index and the underlying writers.
// For simplicity, the entirety of the blockstore methods grab the mutex.
mu sync.RWMutex
// When true, the blockstore has been closed via Close, Discard, or
// Finalize, and must not be used. Any further blockstore method calls
// will return errClosed to avoid panics or broken behavior.
closed bool
// The backing containing the data payload in CARv1 format.
backing io.ReaderAt
// The CARv1 content index.
idx index.Index
// If we called carv2.NewReaderMmap, remember to close it too.
carv2Closer io.Closer
ropts carv2.ReadOptions
}
contextKey string
)
type contextKey string
const asyncErrHandlerKey contextKey = "asyncErrorHandlerKey"
......@@ -177,7 +181,7 @@ func (b *ReadOnly) readBlock(idx int64) (cid.Cid, []byte, error) {
return bcid, data, err
}
// DeleteBlock is unsupported and always panics.
// DeleteBlock is unsupported and always errors.
func (b *ReadOnly) DeleteBlock(_ cid.Cid) error {
return errReadOnly
}
......@@ -187,6 +191,10 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) {
b.mu.RLock()
defer b.mu.RUnlock()
if b.closed {
return false, errClosed
}
var fnFound bool
var fnErr error
err := b.idx.GetAll(key, func(offset uint64) bool {
......@@ -223,6 +231,10 @@ func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) {
b.mu.RLock()
defer b.mu.RUnlock()
if b.closed {
return nil, errClosed
}
var fnData []byte
var fnErr error
err := b.idx.GetAll(key, func(offset uint64) bool {
......@@ -263,6 +275,10 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) {
b.mu.RLock()
defer b.mu.RUnlock()
if b.closed {
return 0, errClosed
}
fnSize := -1
var fnErr error
err := b.idx.GetAll(key, func(offset uint64) bool {
......@@ -329,16 +345,26 @@ func WithAsyncErrorHandler(ctx context.Context, errHandler func(error)) context.
// See WithAsyncErrorHandler
func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// We release the lock when the channel-sending goroutine stops.
// Note that we can't use a deferred unlock here,
// because if we return a nil error,
// we only want to unlock once the async goroutine has stopped.
b.mu.RLock()
if b.closed {
b.mu.RUnlock() // don't hold the mutex forever
return nil, errClosed
}
// TODO we may use this walk for populating the index, and we need to be able to iterate keys in this way somewhere for index generation. In general though, when it's asked for all keys from a blockstore with an index, we should iterate through the index when possible rather than linear reads through the full car.
rdr := internalio.NewOffsetReadSeeker(b.backing, 0)
header, err := carv1.ReadHeader(rdr)
if err != nil {
b.mu.RUnlock() // don't hold the mutex forever
return nil, fmt.Errorf("error reading car header: %w", err)
}
headerSize, err := carv1.HeaderSize(header)
if err != nil {
b.mu.RUnlock() // don't hold the mutex forever
return nil, err
}
......@@ -347,6 +373,7 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// Seek to the end of header.
if _, err = rdr.Seek(int64(headerSize), io.SeekStart); err != nil {
b.mu.RUnlock() // don't hold the mutex forever
return nil, err
}
......@@ -424,10 +451,10 @@ func (b *ReadOnly) Roots() ([]cid.Cid, error) {
}
// Close closes the underlying reader if it was opened by OpenReadOnly.
// After this call, the blockstore can no longer be used.
//
// Note that this call may block if any blockstore operations are currently in
// progress, including an AllKeysChan that hasn't been fully consumed or
// cancelled.
// progress, including an AllKeysChan that hasn't been fully consumed or cancelled.
func (b *ReadOnly) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
......@@ -436,6 +463,7 @@ func (b *ReadOnly) Close() error {
}
func (b *ReadOnly) closeWithoutMutex() error {
b.closed = true
if b.carv2Closer != nil {
return b.carv2Closer.Close()
}
......
......@@ -101,7 +101,7 @@ func TestReadOnly(t *testing.T) {
require.NoError(t, err)
require.Equal(t, wantBlock, gotBlock)
// Assert write operations panic
// Assert write operations error
require.Error(t, subject.Put(wantBlock))
require.Error(t, subject.PutMany([]blocks.Block{wantBlock}))
require.Error(t, subject.DeleteBlock(key))
......@@ -235,3 +235,38 @@ func newV1Reader(r io.Reader, zeroLenSectionAsEOF bool) (*carv1.CarReader, error
}
return carv1.NewCarReader(r)
}
func TestReadOnlyErrorAfterClose(t *testing.T) {
bs, err := OpenReadOnly("../testdata/sample-v1.car")
require.NoError(t, err)
roots, err := bs.Roots()
require.NoError(t, err)
_, err = bs.Has(roots[0])
require.NoError(t, err)
_, err = bs.Get(roots[0])
require.NoError(t, err)
_, err = bs.GetSize(roots[0])
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
_, err = bs.AllKeysChan(ctx)
require.NoError(t, err)
cancel() // to stop the AllKeysChan goroutine
bs.Close()
_, err = bs.Roots()
require.Error(t, err)
_, err = bs.Has(roots[0])
require.Error(t, err)
_, err = bs.Get(roots[0])
require.Error(t, err)
_, err = bs.GetSize(roots[0])
require.Error(t, err)
_, err = bs.AllKeysChan(ctx)
require.Error(t, err)
// TODO: test that closing blocks if an AllKeysChan operation is
// in progress.
}
......@@ -23,8 +23,6 @@ import (
var _ blockstore.Blockstore = (*ReadWrite)(nil)
var errFinalized = fmt.Errorf("cannot use a read-write carv2 blockstore after finalizing")
// ReadWrite implements a blockstore that stores blocks in CARv2 format.
// Blocks put into the blockstore can be read back once they are successfully written.
// This implementation is preferable for a write-heavy workload.
......@@ -33,7 +31,7 @@ var errFinalized = fmt.Errorf("cannot use a read-write carv2 blockstore after fi
//
// The Finalize function must be called once the putting blocks are finished.
// Upon calling Finalize header is finalized and index is written out.
// Once finalized, all read and write calls to this blockstore will result in panics.
// Once finalized, all read and write calls to this blockstore will result in errors.
type ReadWrite struct {
ronly ReadOnly
......@@ -62,7 +60,7 @@ func AllowDuplicatePuts(allow bool) carv2.WriteOption {
// ReadWrite.Finalize must be called once putting and reading blocks are no longer needed.
// Upon calling ReadWrite.Finalize the CARv2 header and index are written out onto the file and the
// backing file is closed. Once finalized, all read and write calls to this blockstore will result
// in panics. Note, ReadWrite.Finalize must be called on an open instance regardless of whether any
// in errors. Note, ReadWrite.Finalize must be called on an open instance regardless of whether any
// blocks were put or not.
//
// If a file at given path does not exist, the instantiation will write car.Pragma and data payload
......@@ -287,26 +285,22 @@ func (b *ReadWrite) unfinalize() error {
return err
}
func (b *ReadWrite) finalized() bool {
return b.header.DataSize != 0
}
// Put puts a given block to the underlying datastore
func (b *ReadWrite) Put(blk blocks.Block) error {
// PutMany already checks b.finalized.
// PutMany already checks b.ronly.closed.
return b.PutMany([]blocks.Block{blk})
}
// PutMany puts a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
func (b *ReadWrite) PutMany(blks []blocks.Block) error {
if b.finalized() {
return errFinalized
}
b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()
if b.ronly.closed {
return errClosed
}
for _, bl := range blks {
c := bl.Cid()
......@@ -331,25 +325,37 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error {
return nil
}
// Discard closes this blockstore without finalizing its header and index.
// After this call, the blockstore can no longer be used.
//
// Note that this call may block if any blockstore operations are currently in
// progress, including an AllKeysChan that hasn't been fully consumed or cancelled.
func (b *ReadWrite) Discard() {
// Same semantics as ReadOnly.Close, including allowing duplicate calls.
// The only difference is that our method is called Discard,
// to further clarify that we're not properly finalizing and writing a
// CARv2 file.
b.ronly.Close()
}
// Finalize finalizes this blockstore by writing the CARv2 header, along with flattened index
// for more efficient subsequent read.
// After this call, this blockstore can no longer be used for read or write.
// After this call, the blockstore can no longer be used.
func (b *ReadWrite) Finalize() error {
if b.header.DataSize != 0 {
b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()
if b.ronly.closed {
// Allow duplicate Finalize calls, just like Close.
// Still error, just like ReadOnly.Close; it should be discarded.
return fmt.Errorf("called Finalize twice")
return fmt.Errorf("called Finalize on a closed blockstore")
}
b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()
// TODO check if add index option is set and don't write the index then set index offset to zero.
b.header = b.header.WithDataSize(uint64(b.dataWriter.Position()))
// Note that we can't use b.Close here, as that tries to grab the same
// mutex we're holding here.
// TODO: should we check the error here? especially with OpenReadWrite,
// we should care about close errors.
defer b.ronly.closeWithoutMutex()
// TODO if index not needed don't bother flattening it.
......@@ -360,39 +366,29 @@ func (b *ReadWrite) Finalize() error {
if err := index.WriteTo(fi, internalio.NewOffsetWriter(b.f, int64(b.header.IndexOffset))); err != nil {
return err
}
_, err = b.header.WriteTo(internalio.NewOffsetWriter(b.f, carv2.PragmaSize))
return err
}
if _, err := b.header.WriteTo(internalio.NewOffsetWriter(b.f, carv2.PragmaSize)); err != nil {
return err
}
func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if b.finalized() {
return nil, errFinalized
if err := b.ronly.closeWithoutMutex(); err != nil {
return err
}
return nil
}
func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.ronly.AllKeysChan(ctx)
}
func (b *ReadWrite) Has(key cid.Cid) (bool, error) {
if b.finalized() {
return false, errFinalized
}
return b.ronly.Has(key)
}
func (b *ReadWrite) Get(key cid.Cid) (blocks.Block, error) {
if b.finalized() {
return nil, errFinalized
}
return b.ronly.Get(key)
}
func (b *ReadWrite) GetSize(key cid.Cid) (int, error) {
if b.finalized() {
return 0, errFinalized
}
return b.ronly.GetSize(key)
}
......
......@@ -373,7 +373,7 @@ func TestBlockstoreResumption(t *testing.T) {
// Close off the open file and re-instantiate a new subject with resumption enabled.
// Note, we don't have to close the file for resumption to work.
// We do this to avoid resource leak during testing.
require.NoError(t, blockstore.CloseReadWrite(subject))
subject.Discard()
}
subject, err = blockstore.OpenReadWrite(path, r.Header.Roots,
blockstore.UseWholeCIDs(true))
......@@ -405,7 +405,7 @@ func TestBlockstoreResumption(t *testing.T) {
require.Equal(t, wantBlockCountSoFar, gotBlockCountSoFar)
}
}
require.NoError(t, blockstore.CloseReadWrite(subject))
subject.Discard()
// Finalize the blockstore to complete partially written CARv2 file.
subject, err = blockstore.OpenReadWrite(path, r.Header.Roots,
......@@ -619,3 +619,51 @@ func TestReadWriteResumptionFromFileWithDifferentCarV1PaddingIsError(t *testing.
"Expected padding value of 1413 but got 1314")
require.Nil(t, resumingSubject)
}
func TestReadWriteErrorAfterClose(t *testing.T) {
root := blocks.NewBlock([]byte("foo"))
for _, closeMethod := range []func(*blockstore.ReadWrite){
(*blockstore.ReadWrite).Discard,
func(bs *blockstore.ReadWrite) { bs.Finalize() },
} {
path := filepath.Join(t.TempDir(), "readwrite.car")
bs, err := blockstore.OpenReadWrite(path, []cid.Cid{root.Cid()})
require.NoError(t, err)
err = bs.Put(root)
require.NoError(t, err)
roots, err := bs.Roots()
require.NoError(t, err)
_, err = bs.Has(roots[0])
require.NoError(t, err)
_, err = bs.Get(roots[0])
require.NoError(t, err)
_, err = bs.GetSize(roots[0])
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
_, err = bs.AllKeysChan(ctx)
require.NoError(t, err)
cancel() // to stop the AllKeysChan goroutine
closeMethod(bs)
_, err = bs.Roots()
require.Error(t, err)
_, err = bs.Has(roots[0])
require.Error(t, err)
_, err = bs.Get(roots[0])
require.Error(t, err)
_, err = bs.GetSize(roots[0])
require.Error(t, err)
_, err = bs.AllKeysChan(ctx)
require.Error(t, err)
err = bs.Put(root)
require.Error(t, err)
// TODO: test that closing blocks if an AllKeysChan operation is
// in progress.
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment