diff --git a/v2/blockstore/example_test.go b/v2/blockstore/example_test.go index 3091e471e7f5b9fc6e4fc9acda39f2eec503afb1..7e826addb0ffec8b3df17d5e81cab4c8f591cb4b 100644 --- a/v2/blockstore/example_test.go +++ b/v2/blockstore/example_test.go @@ -93,7 +93,6 @@ func ExampleOpenReadWrite() { if err != nil { panic(err) } - defer rwbs.Close() // Put all blocks onto the blockstore. blocks := []blocks.Block{thisBlock, thatBlock} @@ -121,7 +120,6 @@ func ExampleOpenReadWrite() { if err != nil { panic(err) } - defer resumedRwbos.Close() // Put another block, appending it to the set of blocks that are written previously. if err := resumedRwbos.Put(andTheOtherBlock); err != nil { diff --git a/v2/blockstore/export_test.go b/v2/blockstore/export_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d4998eec8cfaa3677e17379c6b5e59f5511a431f --- /dev/null +++ b/v2/blockstore/export_test.go @@ -0,0 +1,11 @@ +package blockstore + +// CloseReadWrite allows our external tests to close a read-write blockstore +// without finalizing it. +// The public API doesn't expose such a method. +// In the future, we might consider adding NewReadWrite taking io interfaces, +// meaning that the caller could be fully in control of opening and closing files. +// Another option would be to expose a "Discard" method alongside "Finalize". +func CloseReadWrite(b *ReadWrite) error { + return b.ronly.Close() +} diff --git a/v2/blockstore/readwrite.go b/v2/blockstore/readwrite.go index 1192e749ca3be5be98b45b5f9bde735292d1bcc8..8986ff3055934f527e31a5fea2426ec323106c3f 100644 --- a/v2/blockstore/readwrite.go +++ b/v2/blockstore/readwrite.go @@ -35,11 +35,12 @@ var errFinalized = fmt.Errorf("cannot use a read-write carv2 blockstore after fi // Upon calling Finalize header is finalized and index is written out. // Once finalized, all read and write calls to this blockstore will result in panics. type ReadWrite struct { + ronly ReadOnly + f *os.File dataWriter *internalio.OffsetWriteSeeker - ReadOnly - idx *insertionIndex - header carv2.Header + idx *insertionIndex + header carv2.Header wopts carv2.WriteOptions } @@ -120,7 +121,7 @@ func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption) for _, opt := range opts { switch opt := opt.(type) { case carv2.ReadOption: - opt(&rwbs.ropts) + opt(&rwbs.ronly.ropts) case carv2.WriteOption: opt(&rwbs.wopts) } @@ -134,9 +135,9 @@ func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption) rwbs.dataWriter = internalio.NewOffsetWriter(rwbs.f, int64(rwbs.header.DataOffset)) v1r := internalio.NewOffsetReadSeeker(rwbs.f, int64(rwbs.header.DataOffset)) - rwbs.ReadOnly.backing = v1r - rwbs.ReadOnly.idx = rwbs.idx - rwbs.ReadOnly.carv2Closer = rwbs.f + rwbs.ronly.backing = v1r + rwbs.ronly.idx = rwbs.idx + rwbs.ronly.carv2Closer = rwbs.f if resume { if err = rwbs.resumeWithRoots(roots); err != nil { @@ -216,7 +217,7 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error { } // Use the given CARv1 padding to instantiate the CARv1 reader on file. - v1r := internalio.NewOffsetReadSeeker(b.ReadOnly.backing, 0) + v1r := internalio.NewOffsetReadSeeker(b.ronly.backing, 0) header, err := carv1.ReadHeader(v1r) if err != nil { // Cannot read the CARv1 header; the file is most likely corrupt. @@ -256,7 +257,7 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error { // Null padding; by default it's an error. if length == 0 { - if b.ropts.ZeroLengthSectionAsEOF { + if b.ronly.ropts.ZeroLengthSectionAsEOF { break } else { return fmt.Errorf("carv1 null padding not allowed by default; see WithZeroLegthSectionAsEOF") @@ -303,17 +304,17 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error { return errFinalized } - b.mu.Lock() - defer b.mu.Unlock() + b.ronly.mu.Lock() + defer b.ronly.mu.Unlock() for _, bl := range blks { c := bl.Cid() if !b.wopts.BlockstoreAllowDuplicatePuts { - if b.ropts.BlockstoreUseWholeCIDs && b.idx.hasExactCID(c) { + if b.ronly.ropts.BlockstoreUseWholeCIDs && b.idx.hasExactCID(c) { continue // deduplicated by CID } - if !b.ropts.BlockstoreUseWholeCIDs { + if !b.ronly.ropts.BlockstoreUseWholeCIDs { _, err := b.idx.Get(c) if err == nil { continue // deduplicated by hash @@ -340,8 +341,8 @@ func (b *ReadWrite) Finalize() error { return fmt.Errorf("called Finalize twice") } - b.mu.Lock() - defer b.mu.Unlock() + b.ronly.mu.Lock() + defer b.ronly.mu.Unlock() // TODO check if add index option is set and don't write the index then set index offset to zero. b.header = b.header.WithDataSize(uint64(b.dataWriter.Position())) @@ -349,7 +350,7 @@ func (b *ReadWrite) Finalize() error { // mutex we're holding here. // TODO: should we check the error here? especially with OpenReadWrite, // we should care about close errors. - defer b.closeWithoutMutex() + defer b.ronly.closeWithoutMutex() // TODO if index not needed don't bother flattening it. fi, err := b.idx.flatten() @@ -368,7 +369,7 @@ func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return nil, errFinalized } - return b.ReadOnly.AllKeysChan(ctx) + return b.ronly.AllKeysChan(ctx) } func (b *ReadWrite) Has(key cid.Cid) (bool, error) { @@ -376,7 +377,7 @@ func (b *ReadWrite) Has(key cid.Cid) (bool, error) { return false, errFinalized } - return b.ReadOnly.Has(key) + return b.ronly.Has(key) } func (b *ReadWrite) Get(key cid.Cid) (blocks.Block, error) { @@ -384,7 +385,7 @@ func (b *ReadWrite) Get(key cid.Cid) (blocks.Block, error) { return nil, errFinalized } - return b.ReadOnly.Get(key) + return b.ronly.Get(key) } func (b *ReadWrite) GetSize(key cid.Cid) (int, error) { @@ -392,5 +393,17 @@ func (b *ReadWrite) GetSize(key cid.Cid) (int, error) { return 0, errFinalized } - return b.ReadOnly.GetSize(key) + return b.ronly.GetSize(key) +} + +func (b *ReadWrite) DeleteBlock(_ cid.Cid) error { + return fmt.Errorf("ReadWrite blockstore does not support deleting blocks") +} + +func (b *ReadWrite) HashOnRead(enable bool) { + b.ronly.HashOnRead(enable) +} + +func (b *ReadWrite) Roots() ([]cid.Cid, error) { + return b.ronly.Roots() } diff --git a/v2/blockstore/readwrite_test.go b/v2/blockstore/readwrite_test.go index 77442de0dc44396575fbe9a42cf8533c5ed964d9..5316f5bf7e997e153f87182ce3ce2d02a8f8a4ea 100644 --- a/v2/blockstore/readwrite_test.go +++ b/v2/blockstore/readwrite_test.go @@ -38,7 +38,7 @@ var ( func TestReadWriteGetReturnsBlockstoreNotFoundWhenCidDoesNotExist(t *testing.T) { path := filepath.Join(t.TempDir(), "readwrite-err-not-found.car") subject, err := blockstore.OpenReadWrite(path, []cid.Cid{}) - t.Cleanup(func() { subject.Close() }) + t.Cleanup(func() { subject.Finalize() }) require.NoError(t, err) nonExistingKey := merkledag.NewRawNode([]byte("undadasea")).Block.Cid() @@ -373,7 +373,7 @@ func TestBlockstoreResumption(t *testing.T) { // Close off the open file and re-instantiate a new subject with resumption enabled. // Note, we don't have to close the file for resumption to work. // We do this to avoid resource leak during testing. - require.NoError(t, subject.Close()) + require.NoError(t, blockstore.CloseReadWrite(subject)) } subject, err = blockstore.OpenReadWrite(path, r.Header.Roots, blockstore.UseWholeCIDs(true)) @@ -405,7 +405,7 @@ func TestBlockstoreResumption(t *testing.T) { require.Equal(t, wantBlockCountSoFar, gotBlockCountSoFar) } } - require.NoError(t, subject.Close()) + require.NoError(t, blockstore.CloseReadWrite(subject)) // Finalize the blockstore to complete partially written CARv2 file. subject, err = blockstore.OpenReadWrite(path, r.Header.Roots, @@ -460,8 +460,8 @@ func TestBlockstoreResumptionIsSupportedOnFinalizedFile(t *testing.T) { require.NoError(t, err) require.NoError(t, subject.Finalize()) subject, err = blockstore.OpenReadWrite(path, []cid.Cid{}) - t.Cleanup(func() { subject.Close() }) require.NoError(t, err) + t.Cleanup(func() { subject.Finalize() }) } func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) { @@ -472,7 +472,6 @@ func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) { subject, err := blockstore.OpenReadWrite(path, wantRoots) require.NoError(t, err) - t.Cleanup(func() { subject.Close() }) require.NoError(t, subject.Put(oneTestBlockWithCidV1)) require.NoError(t, subject.Put(anotherTestBlockWithCidV0)) @@ -500,6 +499,9 @@ func TestReadWritePanicsOnlyWhenFinalized(t *testing.T) { require.NoError(t, subject.Finalize()) require.Error(t, subject.Finalize()) + _, ok := (interface{})(subject).(io.Closer) + require.False(t, ok) + _, err = subject.Get(oneTestBlockCid) require.Error(t, err) _, err = subject.GetSize(anotherTestBlockCid) @@ -528,7 +530,6 @@ func TestReadWriteWithPaddingWorksAsExpected(t *testing.T) { carv2.UseDataPadding(wantCarV1Padding), carv2.UseIndexPadding(wantIndexPadding)) require.NoError(t, err) - t.Cleanup(func() { subject.Close() }) require.NoError(t, subject.Put(oneTestBlockWithCidV1)) require.NoError(t, subject.Put(anotherTestBlockWithCidV0)) require.NoError(t, subject.Finalize()) @@ -606,7 +607,6 @@ func TestReadWriteResumptionFromFileWithDifferentCarV1PaddingIsError(t *testing. WantRoots, carv2.UseDataPadding(1413)) require.NoError(t, err) - t.Cleanup(func() { subject.Close() }) require.NoError(t, subject.Put(oneTestBlockWithCidV1)) require.NoError(t, subject.Finalize())