Commit 3752fdb3 authored by Daniel Martí's avatar Daniel Martí Committed by Masih H. Derkani

blockstore: stop embedding ReadOnly in ReadWrite

It has unintended side effects, as we leak the inner ReadOnly value as
well as its methods to the parent ReadWrite API.

One unintended consequence is that it was possible to "close" a
ReadWrite, when we only wanted to support finalizing it.

The resumption tests do want to close without finalization,
for the sake of emulating partially-written CARv2 files.
Those can hook into the unexported API via export_test.go.

If a downstream really wants to support closing a ReadWrite blockstore
without finalizing it, two alternatives are outlined in export_test.go.
parent d0e44a62
...@@ -93,7 +93,6 @@ func ExampleOpenReadWrite() { ...@@ -93,7 +93,6 @@ func ExampleOpenReadWrite() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer rwbs.Close()
// Put all blocks onto the blockstore. // Put all blocks onto the blockstore.
blocks := []blocks.Block{thisBlock, thatBlock} blocks := []blocks.Block{thisBlock, thatBlock}
...@@ -121,7 +120,6 @@ func ExampleOpenReadWrite() { ...@@ -121,7 +120,6 @@ func ExampleOpenReadWrite() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer resumedRwbos.Close()
// Put another block, appending it to the set of blocks that are written previously. // Put another block, appending it to the set of blocks that are written previously.
if err := resumedRwbos.Put(andTheOtherBlock); err != nil { if err := resumedRwbos.Put(andTheOtherBlock); err != nil {
......
package blockstore
// CloseReadWrite allows our external tests to close a read-write blockstore
// without finalizing it.
// The public API doesn't expose such a method.
// In the future, we might consider adding NewReadWrite taking io interfaces,
// meaning that the caller could be fully in control of opening and closing files.
// Another option would be to expose a "Discard" method alongside "Finalize".
func CloseReadWrite(b *ReadWrite) error {
return b.ronly.Close()
}
...@@ -35,11 +35,12 @@ var errFinalized = fmt.Errorf("cannot use a read-write carv2 blockstore after fi ...@@ -35,11 +35,12 @@ var errFinalized = fmt.Errorf("cannot use a read-write carv2 blockstore after fi
// Upon calling Finalize header is finalized and index is written out. // Upon calling Finalize header is finalized and index is written out.
// Once finalized, all read and write calls to this blockstore will result in panics. // Once finalized, all read and write calls to this blockstore will result in panics.
type ReadWrite struct { type ReadWrite struct {
ronly ReadOnly
f *os.File f *os.File
dataWriter *internalio.OffsetWriteSeeker dataWriter *internalio.OffsetWriteSeeker
ReadOnly idx *insertionIndex
idx *insertionIndex header carv2.Header
header carv2.Header
wopts carv2.WriteOptions wopts carv2.WriteOptions
} }
...@@ -120,7 +121,7 @@ func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption) ...@@ -120,7 +121,7 @@ func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption)
for _, opt := range opts { for _, opt := range opts {
switch opt := opt.(type) { switch opt := opt.(type) {
case carv2.ReadOption: case carv2.ReadOption:
opt(&rwbs.ropts) opt(&rwbs.ronly.ropts)
case carv2.WriteOption: case carv2.WriteOption:
opt(&rwbs.wopts) opt(&rwbs.wopts)
} }
...@@ -134,9 +135,9 @@ func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption) ...@@ -134,9 +135,9 @@ func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption)
rwbs.dataWriter = internalio.NewOffsetWriter(rwbs.f, int64(rwbs.header.DataOffset)) rwbs.dataWriter = internalio.NewOffsetWriter(rwbs.f, int64(rwbs.header.DataOffset))
v1r := internalio.NewOffsetReadSeeker(rwbs.f, int64(rwbs.header.DataOffset)) v1r := internalio.NewOffsetReadSeeker(rwbs.f, int64(rwbs.header.DataOffset))
rwbs.ReadOnly.backing = v1r rwbs.ronly.backing = v1r
rwbs.ReadOnly.idx = rwbs.idx rwbs.ronly.idx = rwbs.idx
rwbs.ReadOnly.carv2Closer = rwbs.f rwbs.ronly.carv2Closer = rwbs.f
if resume { if resume {
if err = rwbs.resumeWithRoots(roots); err != nil { if err = rwbs.resumeWithRoots(roots); err != nil {
...@@ -216,7 +217,7 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error { ...@@ -216,7 +217,7 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error {
} }
// Use the given CARv1 padding to instantiate the CARv1 reader on file. // Use the given CARv1 padding to instantiate the CARv1 reader on file.
v1r := internalio.NewOffsetReadSeeker(b.ReadOnly.backing, 0) v1r := internalio.NewOffsetReadSeeker(b.ronly.backing, 0)
header, err := carv1.ReadHeader(v1r) header, err := carv1.ReadHeader(v1r)
if err != nil { if err != nil {
// Cannot read the CARv1 header; the file is most likely corrupt. // Cannot read the CARv1 header; the file is most likely corrupt.
...@@ -256,7 +257,7 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error { ...@@ -256,7 +257,7 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error {
// Null padding; by default it's an error. // Null padding; by default it's an error.
if length == 0 { if length == 0 {
if b.ropts.ZeroLengthSectionAsEOF { if b.ronly.ropts.ZeroLengthSectionAsEOF {
break break
} else { } else {
return fmt.Errorf("carv1 null padding not allowed by default; see WithZeroLegthSectionAsEOF") return fmt.Errorf("carv1 null padding not allowed by default; see WithZeroLegthSectionAsEOF")
...@@ -303,17 +304,17 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error { ...@@ -303,17 +304,17 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error {
return errFinalized return errFinalized
} }
b.mu.Lock() b.ronly.mu.Lock()
defer b.mu.Unlock() defer b.ronly.mu.Unlock()
for _, bl := range blks { for _, bl := range blks {
c := bl.Cid() c := bl.Cid()
if !b.wopts.BlockstoreAllowDuplicatePuts { if !b.wopts.BlockstoreAllowDuplicatePuts {
if b.ropts.BlockstoreUseWholeCIDs && b.idx.hasExactCID(c) { if b.ronly.ropts.BlockstoreUseWholeCIDs && b.idx.hasExactCID(c) {
continue // deduplicated by CID continue // deduplicated by CID
} }
if !b.ropts.BlockstoreUseWholeCIDs { if !b.ronly.ropts.BlockstoreUseWholeCIDs {
_, err := b.idx.Get(c) _, err := b.idx.Get(c)
if err == nil { if err == nil {
continue // deduplicated by hash continue // deduplicated by hash
...@@ -340,8 +341,8 @@ func (b *ReadWrite) Finalize() error { ...@@ -340,8 +341,8 @@ func (b *ReadWrite) Finalize() error {
return fmt.Errorf("called Finalize twice") return fmt.Errorf("called Finalize twice")
} }
b.mu.Lock() b.ronly.mu.Lock()
defer b.mu.Unlock() defer b.ronly.mu.Unlock()
// TODO check if add index option is set and don't write the index then set index offset to zero. // TODO check if add index option is set and don't write the index then set index offset to zero.
b.header = b.header.WithDataSize(uint64(b.dataWriter.Position())) b.header = b.header.WithDataSize(uint64(b.dataWriter.Position()))
...@@ -349,7 +350,7 @@ func (b *ReadWrite) Finalize() error { ...@@ -349,7 +350,7 @@ func (b *ReadWrite) Finalize() error {
// mutex we're holding here. // mutex we're holding here.
// TODO: should we check the error here? especially with OpenReadWrite, // TODO: should we check the error here? especially with OpenReadWrite,
// we should care about close errors. // we should care about close errors.
defer b.closeWithoutMutex() defer b.ronly.closeWithoutMutex()
// TODO if index not needed don't bother flattening it. // TODO if index not needed don't bother flattening it.
fi, err := b.idx.flatten() fi, err := b.idx.flatten()
...@@ -368,7 +369,7 @@ func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { ...@@ -368,7 +369,7 @@ func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errFinalized return nil, errFinalized
} }
return b.ReadOnly.AllKeysChan(ctx) return b.ronly.AllKeysChan(ctx)
} }
func (b *ReadWrite) Has(key cid.Cid) (bool, error) { func (b *ReadWrite) Has(key cid.Cid) (bool, error) {
...@@ -376,7 +377,7 @@ func (b *ReadWrite) Has(key cid.Cid) (bool, error) { ...@@ -376,7 +377,7 @@ func (b *ReadWrite) Has(key cid.Cid) (bool, error) {
return false, errFinalized return false, errFinalized
} }
return b.ReadOnly.Has(key) return b.ronly.Has(key)
} }
func (b *ReadWrite) Get(key cid.Cid) (blocks.Block, error) { func (b *ReadWrite) Get(key cid.Cid) (blocks.Block, error) {
...@@ -384,7 +385,7 @@ func (b *ReadWrite) Get(key cid.Cid) (blocks.Block, error) { ...@@ -384,7 +385,7 @@ func (b *ReadWrite) Get(key cid.Cid) (blocks.Block, error) {
return nil, errFinalized return nil, errFinalized
} }
return b.ReadOnly.Get(key) return b.ronly.Get(key)
} }
func (b *ReadWrite) GetSize(key cid.Cid) (int, error) { func (b *ReadWrite) GetSize(key cid.Cid) (int, error) {
...@@ -392,5 +393,17 @@ func (b *ReadWrite) GetSize(key cid.Cid) (int, error) { ...@@ -392,5 +393,17 @@ func (b *ReadWrite) GetSize(key cid.Cid) (int, error) {
return 0, errFinalized return 0, errFinalized
} }
return b.ReadOnly.GetSize(key) return b.ronly.GetSize(key)
}
func (b *ReadWrite) DeleteBlock(_ cid.Cid) error {
return fmt.Errorf("ReadWrite blockstore does not support deleting blocks")
}
func (b *ReadWrite) HashOnRead(enable bool) {
b.ronly.HashOnRead(enable)
}
func (b *ReadWrite) Roots() ([]cid.Cid, error) {
return b.ronly.Roots()
} }
...@@ -38,7 +38,7 @@ var ( ...@@ -38,7 +38,7 @@ var (
func TestReadWriteGetReturnsBlockstoreNotFoundWhenCidDoesNotExist(t *testing.T) { func TestReadWriteGetReturnsBlockstoreNotFoundWhenCidDoesNotExist(t *testing.T) {
path := filepath.Join(t.TempDir(), "readwrite-err-not-found.car") path := filepath.Join(t.TempDir(), "readwrite-err-not-found.car")
subject, err := blockstore.OpenReadWrite(path, []cid.Cid{}) subject, err := blockstore.OpenReadWrite(path, []cid.Cid{})
t.Cleanup(func() { subject.Close() }) t.Cleanup(func() { subject.Finalize() })
require.NoError(t, err) require.NoError(t, err)
nonExistingKey := merkledag.NewRawNode([]byte("undadasea")).Block.Cid() nonExistingKey := merkledag.NewRawNode([]byte("undadasea")).Block.Cid()
...@@ -373,7 +373,7 @@ func TestBlockstoreResumption(t *testing.T) { ...@@ -373,7 +373,7 @@ func TestBlockstoreResumption(t *testing.T) {
// Close off the open file and re-instantiate a new subject with resumption enabled. // Close off the open file and re-instantiate a new subject with resumption enabled.
// Note, we don't have to close the file for resumption to work. // Note, we don't have to close the file for resumption to work.
// We do this to avoid resource leak during testing. // We do this to avoid resource leak during testing.
require.NoError(t, subject.Close()) require.NoError(t, blockstore.CloseReadWrite(subject))
} }
subject, err = blockstore.OpenReadWrite(path, r.Header.Roots, subject, err = blockstore.OpenReadWrite(path, r.Header.Roots,
blockstore.UseWholeCIDs(true)) blockstore.UseWholeCIDs(true))
...@@ -405,7 +405,7 @@ func TestBlockstoreResumption(t *testing.T) { ...@@ -405,7 +405,7 @@ func TestBlockstoreResumption(t *testing.T) {
require.Equal(t, wantBlockCountSoFar, gotBlockCountSoFar) require.Equal(t, wantBlockCountSoFar, gotBlockCountSoFar)
} }
} }
require.NoError(t, subject.Close()) require.NoError(t, blockstore.CloseReadWrite(subject))
// Finalize the blockstore to complete partially written CARv2 file. // Finalize the blockstore to complete partially written CARv2 file.
subject, err = blockstore.OpenReadWrite(path, r.Header.Roots, subject, err = blockstore.OpenReadWrite(path, r.Header.Roots,
...@@ -460,8 +460,8 @@ func TestBlockstoreResumptionIsSupportedOnFinalizedFile(t *testing.T) { ...@@ -460,8 +460,8 @@ func TestBlockstoreResumptionIsSupportedOnFinalizedFile(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, subject.Finalize()) require.NoError(t, subject.Finalize())
subject, err = blockstore.OpenReadWrite(path, []cid.Cid{}) subject, err = blockstore.OpenReadWrite(path, []cid.Cid{})
t.Cleanup(func() { subject.Close() })
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { subject.Finalize() })
} }
func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) { func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) {
...@@ -472,7 +472,6 @@ func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) { ...@@ -472,7 +472,6 @@ func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) {
subject, err := blockstore.OpenReadWrite(path, wantRoots) subject, err := blockstore.OpenReadWrite(path, wantRoots)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { subject.Close() })
require.NoError(t, subject.Put(oneTestBlockWithCidV1)) require.NoError(t, subject.Put(oneTestBlockWithCidV1))
require.NoError(t, subject.Put(anotherTestBlockWithCidV0)) require.NoError(t, subject.Put(anotherTestBlockWithCidV0))
...@@ -500,6 +499,9 @@ func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) { ...@@ -500,6 +499,9 @@ func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) {
require.NoError(t, subject.Finalize()) require.NoError(t, subject.Finalize())
require.Error(t, subject.Finalize()) require.Error(t, subject.Finalize())
_, ok := (interface{})(subject).(io.Closer)
require.False(t, ok)
_, err = subject.Get(oneTestBlockCid) _, err = subject.Get(oneTestBlockCid)
require.Error(t, err) require.Error(t, err)
_, err = subject.GetSize(anotherTestBlockCid) _, err = subject.GetSize(anotherTestBlockCid)
...@@ -528,7 +530,6 @@ func TestReadWriteWithPaddingWorksAsExpected(t *testing.T) { ...@@ -528,7 +530,6 @@ func TestReadWriteWithPaddingWorksAsExpected(t *testing.T) {
carv2.UseDataPadding(wantCarV1Padding), carv2.UseDataPadding(wantCarV1Padding),
carv2.UseIndexPadding(wantIndexPadding)) carv2.UseIndexPadding(wantIndexPadding))
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { subject.Close() })
require.NoError(t, subject.Put(oneTestBlockWithCidV1)) require.NoError(t, subject.Put(oneTestBlockWithCidV1))
require.NoError(t, subject.Put(anotherTestBlockWithCidV0)) require.NoError(t, subject.Put(anotherTestBlockWithCidV0))
require.NoError(t, subject.Finalize()) require.NoError(t, subject.Finalize())
...@@ -606,7 +607,6 @@ func TestReadWriteResumptionFromFileWithDifferentCarV1PaddingIsError(t *testing. ...@@ -606,7 +607,6 @@ func TestReadWriteResumptionFromFileWithDifferentCarV1PaddingIsError(t *testing.
WantRoots, WantRoots,
carv2.UseDataPadding(1413)) carv2.UseDataPadding(1413))
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { subject.Close() })
require.NoError(t, subject.Put(oneTestBlockWithCidV1)) require.NoError(t, subject.Put(oneTestBlockWithCidV1))
require.NoError(t, subject.Finalize()) require.NoError(t, subject.Finalize())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment