diff --git a/v2/blockstore/readonly.go b/v2/blockstore/readonly.go index cf799fe7a6902afd501fe2035efbe200a1781d62..5b9c8535ddfd17c0db3318791b9fff43cb854852 100644 --- a/v2/blockstore/readonly.go +++ b/v2/blockstore/readonly.go @@ -24,7 +24,7 @@ import ( var _ blockstore.Blockstore = (*ReadOnly)(nil) -// ReadOnly provides a read-only Car Block Store. +// ReadOnly provides a read-only CAR Block Store. type ReadOnly struct { // mu allows ReadWrite to be safe for concurrent use. // It's in ReadOnly so that read operations also grab read locks, @@ -41,6 +41,35 @@ type ReadOnly struct { // If we called carv2.NewReaderMmap, remember to close it too. carv2Closer io.Closer + + ropts carv2.ReadOptions +} + +// UseWholeCIDs is a read option which makes a CAR blockstore identify blocks by +// whole CIDs, and not just their multihashes. The default is to use +// multihashes, which matches the current semantics of go-ipfs-blockstore v1. +// +// Enabling this option affects a number of methods, including read-only ones: +// +// • Get, Has, and HasSize will only return a block +// only if the entire CID is present in the CAR file. +// +// • DeleteBlock will delete a block only when the entire CID matches. +// +// • AllKeysChan will return the original whole CIDs, instead of with their +// multicodec set to "raw" to just provide multihashes. +// +// • If AllowDuplicatePuts isn't set, +// Put and PutMany will deduplicate by the whole CID, +// allowing different CIDs with equal multihashes. +// +// Note that this option only affects the blockstore, and is ignored by the root +// go-car/v2 package. +func UseWholeCIDs(enable bool) carv2.ReadOption { + return func(o *carv2.ReadOptions) { + // TODO: update methods like Get, Has, and AllKeysChan to obey this. + o.BlockstoreUseWholeCIDs = enable + } } // NewReadOnly creates a new ReadOnly blockstore from the backing with a optional index as idx. @@ -52,7 +81,12 @@ type ReadOnly struct { // * For a CAR v2 backing an index is only generated if Header.HasIndex returns false. // // There is no need to call ReadOnly.Close on instances returned by this function. -func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) { +func NewReadOnly(backing io.ReaderAt, idx index.Index, opts ...carv2.ReadOption) (*ReadOnly, error) { + b := &ReadOnly{} + for _, opt := range opts { + opt(&b.ropts) + } + version, err := readVersion(backing) if err != nil { return nil, err @@ -60,13 +94,15 @@ func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) { switch version { case 1: if idx == nil { - if idx, err = generateIndex(backing); err != nil { + if idx, err = generateIndex(backing, opts...); err != nil { return nil, err } } - return &ReadOnly{backing: backing, idx: idx}, nil + b.backing = backing + b.idx = idx + return b, nil case 2: - v2r, err := carv2.NewReader(backing) + v2r, err := carv2.NewReader(backing, opts...) if err != nil { return nil, err } @@ -76,11 +112,13 @@ func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) { if err != nil { return nil, err } - } else if idx, err = generateIndex(v2r.CarV1Reader()); err != nil { + } else if idx, err = generateIndex(v2r.CarV1Reader(), opts...); err != nil { return nil, err } } - return &ReadOnly{backing: v2r.CarV1Reader(), idx: idx}, nil + b.backing = v2r.CarV1Reader() + b.idx = idx + return b, nil default: return nil, fmt.Errorf("unsupported car version: %v", version) } @@ -97,7 +135,7 @@ func readVersion(at io.ReaderAt) (uint64, error) { return carv2.ReadVersion(rr) } -func generateIndex(at io.ReaderAt) (index.Index, error) { +func generateIndex(at io.ReaderAt, opts ...carv2.ReadOption) (index.Index, error) { var rs io.ReadSeeker switch r := at.(type) { case io.ReadSeeker: @@ -105,19 +143,19 @@ func generateIndex(at io.ReaderAt) (index.Index, error) { default: rs = internalio.NewOffsetReadSeeker(r, 0) } - return carv2.GenerateIndex(rs) + return carv2.GenerateIndex(rs, opts...) } // OpenReadOnly opens a read-only blockstore from a CAR file (either v1 or v2), generating an index if it does not exist. // Note, the generated index if the index does not exist is ephemeral and only stored in memory. // See car.GenerateIndex and Index.Attach for persisting index onto a CAR file. -func OpenReadOnly(path string) (*ReadOnly, error) { +func OpenReadOnly(path string, opts ...carv2.ReadOption) (*ReadOnly, error) { f, err := mmap.Open(path) if err != nil { return nil, err } - robs, err := NewReadOnly(f, nil) + robs, err := NewReadOnly(f, nil, opts...) if err != nil { return nil, err } @@ -191,7 +229,7 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) { return -1, err } rdr := internalio.NewOffsetReadSeeker(b.backing, int64(idx)) - frameLen, err := varint.ReadUvarint(rdr) + sectionLen, err := varint.ReadUvarint(rdr) if err != nil { return -1, blockstore.ErrNotFound } @@ -202,7 +240,7 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) { if !readCid.Equals(key) { return -1, blockstore.ErrNotFound } - return int(frameLen) - cidLen, err + return int(sectionLen) - cidLen, err } // Put is not supported and always returns an error. @@ -249,9 +287,14 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return // TODO: log this error } - // Null padding; treat it as EOF. + // Null padding; by default it's an error. if length == 0 { - break // TODO make this an optional behaviour; by default we should error + if b.ropts.ZeroLegthSectionAsEOF { + break + } else { + return // TODO: log this error + // return fmt.Errorf("carv1 null padding not allowed by default; see WithZeroLegthSectionAsEOF") + } } thisItemForNxt := rdr.Offset() diff --git a/v2/blockstore/readwrite.go b/v2/blockstore/readwrite.go index 0829b2c7d9733a28a979aeca6c03c8c4cfcb0b12..a1c7fd84f798b544d4454588838b2ac63ab28b2b 100644 --- a/v2/blockstore/readwrite.go +++ b/v2/blockstore/readwrite.go @@ -39,33 +39,19 @@ type ReadWrite struct { idx *insertionIndex header carv2.Header - dedupCids bool + wopts carv2.WriteOptions } -// TODO consider exposing interfaces -type Option func(*ReadWrite) // TODO consider unifying with writer options - -// WithCarV1Padding sets the padding to be added between CAR v2 header and its data payload on Finalize. -func WithCarV1Padding(p uint64) Option { - return func(b *ReadWrite) { - b.header = b.header.WithCarV1Padding(p) - } -} - -// WithIndexPadding sets the padding between data payload and its index on Finalize. -func WithIndexPadding(p uint64) Option { - return func(b *ReadWrite) { - b.header = b.header.WithIndexPadding(p) - } -} - -// WithCidDeduplication makes Put calls ignore blocks if the blockstore already -// has the exact same CID. -// This can help avoid redundancy in a CARv1's list of CID-Block pairs. +// AllowDuplicatePuts is a write option which makes a CAR blockstore not +// deduplicate blocks in Put and PutMany. The default is to deduplicate, +// which matches the current semantics of go-ipfs-blockstore v1. // -// Note that this compares whole CIDs, not just multihashes. -func WithCidDeduplication(b *ReadWrite) { // TODO should this take a bool and return an option to allow disabling dedupliation? - b.dedupCids = true +// Note that this option only affects the blockstore, and is ignored by the root +// go-car/v2 package. +func AllowDuplicatePuts(allow bool) carv2.WriteOption { + return func(o *carv2.WriteOptions) { + o.BlockstoreAllowDuplicatePuts = allow + } } // OpenReadWrite creates a new ReadWrite at the given path with a provided set of root CIDs and options. @@ -80,7 +66,7 @@ func WithCidDeduplication(b *ReadWrite) { // TODO should this take a bool and re // header (i.e. the inner CAR v1 header) onto the file before returning. // // When the given path already exists, the blockstore will attempt to resume from it. -// On resumption the existing data frames in file are re-indexed, allowing the caller to continue +// On resumption the existing data sections in file are re-indexed, allowing the caller to continue // putting any remaining blocks without having to re-ingest blocks for which previous ReadWrite.Put // returned successfully. // @@ -93,7 +79,7 @@ func WithCidDeduplication(b *ReadWrite) { // TODO should this take a bool and re // 1. start with a complete CAR v2 car.Pragma. // 2. contain a complete CAR v1 data header with root CIDs matching the CIDs passed to the // constructor, starting at offset optionally padded by WithCarV1Padding, followed by zero or -// more complete data frames. If any corrupt data frames are present the resumption will fail. +// more complete data sections. If any corrupt data sections are present the resumption will fail. // Note, if set previously, the blockstore must use the same WithCarV1Padding option as before, // since this option is used to locate the CAR v1 data payload. // @@ -102,7 +88,7 @@ func WithCidDeduplication(b *ReadWrite) { // TODO should this take a bool and re // // Resuming from finalized files is allowed. However, resumption will regenerate the index // regardless by scanning every existing block in file. -func OpenReadWrite(path string, roots []cid.Cid, opts ...Option) (*ReadWrite, error) { +func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption) (*ReadWrite, error) { // TODO: enable deduplication by default now that resumption is automatically attempted. f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o666) // TODO: Should the user be able to configure FileMode permissions? if err != nil { @@ -129,13 +115,27 @@ func OpenReadWrite(path string, roots []cid.Cid, opts ...Option) (*ReadWrite, er idx: newInsertionIndex(), header: carv2.NewHeader(0), } + for _, opt := range opts { - opt(rwbs) + switch opt := opt.(type) { + case carv2.ReadOption: + opt(&rwbs.ropts) + case carv2.WriteOption: + opt(&rwbs.wopts) + } + } + if p := rwbs.wopts.CarV1Padding; p > 0 { + rwbs.header = rwbs.header.WithCarV1Padding(p) + } + if p := rwbs.wopts.IndexPadding; p > 0 { + rwbs.header = rwbs.header.WithIndexPadding(p) } rwbs.carV1Writer = internalio.NewOffsetWriter(rwbs.f, int64(rwbs.header.CarV1Offset)) v1r := internalio.NewOffsetReadSeeker(rwbs.f, int64(rwbs.header.CarV1Offset)) - rwbs.ReadOnly = ReadOnly{backing: v1r, idx: rwbs.idx, carv2Closer: rwbs.f} + rwbs.ReadOnly.backing = v1r + rwbs.ReadOnly.idx = rwbs.idx + rwbs.ReadOnly.carv2Closer = rwbs.f if resume { if err = rwbs.resumeWithRoots(roots); err != nil { @@ -237,13 +237,13 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error { if err != nil { return err } - frameOffset := int64(0) - if frameOffset, err = v1r.Seek(int64(offset), io.SeekStart); err != nil { + sectionOffset := int64(0) + if sectionOffset, err = v1r.Seek(int64(offset), io.SeekStart); err != nil { return err } for { - // Grab the length of the frame. + // Grab the length of the section. // Note that ReadUvarint wants a ByteReader. length, err := varint.ReadUvarint(v1r) if err != nil { @@ -253,10 +253,13 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error { return err } - // Null padding; treat zero-length frames as an EOF. - // They don't contain a CID nor block, so they're not useful. + // Null padding; by default it's an error. if length == 0 { - break // TODO This behaviour should be an option, not default. By default we should error. Hook this up to a write option + if b.ropts.ZeroLegthSectionAsEOF { + break + } else { + return fmt.Errorf("carv1 null padding not allowed by default; see WithZeroLegthSectionAsEOF") + } } // Grab the CID. @@ -264,16 +267,16 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error { if err != nil { return err } - b.idx.insertNoReplace(c, uint64(frameOffset)) + b.idx.insertNoReplace(c, uint64(sectionOffset)) - // Seek to the next frame by skipping the block. - // The frame length includes the CID, so subtract it. - if frameOffset, err = v1r.Seek(int64(length)-int64(n), io.SeekCurrent); err != nil { + // Seek to the next section by skipping the block. + // The section length includes the CID, so subtract it. + if sectionOffset, err = v1r.Seek(int64(length)-int64(n), io.SeekCurrent); err != nil { return err } } // Seek to the end of last skipped block where the writer should resume writing. - _, err = b.carV1Writer.Seek(frameOffset, io.SeekStart) + _, err = b.carV1Writer.Seek(sectionOffset, io.SeekStart) return err } @@ -304,8 +307,17 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error { for _, bl := range blks { c := bl.Cid() - if b.dedupCids && b.idx.hasExactCID(c) { - continue + + if !b.wopts.BlockstoreAllowDuplicatePuts { + if b.ropts.BlockstoreUseWholeCIDs && b.idx.hasExactCID(c) { + continue // deduplicated by CID + } + if !b.ropts.BlockstoreUseWholeCIDs { + _, err := b.idx.Get(c) + if err == nil { + continue // deduplicated by hash + } + } } n := uint64(b.carV1Writer.Position()) diff --git a/v2/blockstore/readwrite_test.go b/v2/blockstore/readwrite_test.go index 884edca754b7b3bdbaa7df8480b50511082de430..3a8099fbd2991f0f80277aabe710a36b0fd13a34 100644 --- a/v2/blockstore/readwrite_test.go +++ b/v2/blockstore/readwrite_test.go @@ -122,15 +122,19 @@ func TestBlockstore(t *testing.T) { func TestBlockstorePutSameHashes(t *testing.T) { tdir := t.TempDir() + + // wbs allows duplicate puts. wbs, err := blockstore.OpenReadWrite( filepath.Join(tdir, "readwrite.car"), nil, + blockstore.AllowDuplicatePuts(true), ) require.NoError(t, err) t.Cleanup(func() { wbs.Finalize() }) + // wbs deduplicates puts by CID. wbsd, err := blockstore.OpenReadWrite( filepath.Join(tdir, "readwrite-dedup.car"), nil, - blockstore.WithCidDeduplication, + blockstore.UseWholeCIDs(true), ) require.NoError(t, err) t.Cleanup(func() { wbsd.Finalize() }) @@ -276,7 +280,7 @@ func TestBlockstoreNullPadding(t *testing.T) { // A sample null-padded CARv1 file. paddedV1 = append(paddedV1, make([]byte, 2048)...) - rbs, err := blockstore.NewReadOnly(bufferReaderAt(paddedV1), nil) + rbs, err := blockstore.NewReadOnly(bufferReaderAt(paddedV1), nil, carv2.ZeroLegthSectionAsEOF) require.NoError(t, err) roots, err := rbs.Roots() @@ -483,8 +487,8 @@ func TestReadWriteWithPaddingWorksAsExpected(t *testing.T) { subject, err := blockstore.OpenReadWrite( path, WantRoots, - blockstore.WithCarV1Padding(wantCarV1Padding), - blockstore.WithIndexPadding(wantIndexPadding)) + carv2.UseCarV1Padding(wantCarV1Padding), + carv2.UseIndexPadding(wantIndexPadding)) require.NoError(t, err) t.Cleanup(func() { subject.Close() }) require.NoError(t, subject.Put(oneTestBlockWithCidV1)) @@ -544,7 +548,7 @@ func TestReadWriteResumptionFromFileWithDifferentCarV1PaddingIsError(t *testing. subject, err := blockstore.OpenReadWrite( path, WantRoots, - blockstore.WithCarV1Padding(1413)) + carv2.UseCarV1Padding(1413)) require.NoError(t, err) t.Cleanup(func() { subject.Close() }) require.NoError(t, subject.Put(oneTestBlockWithCidV1)) @@ -553,7 +557,7 @@ func TestReadWriteResumptionFromFileWithDifferentCarV1PaddingIsError(t *testing. resumingSubject, err := blockstore.OpenReadWrite( path, WantRoots, - blockstore.WithCarV1Padding(1314)) + carv2.UseCarV1Padding(1314)) require.EqualError(t, err, "cannot resume from file with mismatched CARv1 offset; "+ "`WithCarV1Padding` option must match the padding on file. "+ "Expected padding value of 1413 but got 1314") diff --git a/v2/index_gen.go b/v2/index_gen.go index 859909cae5d30dc9ecbfbcded368959115830e83..4fa9ac1df203b992604b1773f9ae31b0418c6f2e 100644 --- a/v2/index_gen.go +++ b/v2/index_gen.go @@ -18,7 +18,12 @@ import ( // GenerateIndex generates index for a given car in v1 format. // The index can be stored using index.Save into a file or serialized using index.WriteTo. -func GenerateIndex(v1r io.Reader) (index.Index, error) { +func GenerateIndex(v1r io.Reader, opts ...ReadOption) (index.Index, error) { + var o ReadOptions + for _, opt := range opts { + opt(&o) + } + reader := internalio.ToByteReadSeeker(v1r) header, err := carv1.ReadHeader(reader) if err != nil { @@ -35,20 +40,20 @@ func GenerateIndex(v1r io.Reader) (index.Index, error) { } records := make([]index.Record, 0) - // Record the start of each frame, with first frame starring from current position in the + // Record the start of each section, with first section starring from current position in the // reader, i.e. right after the header, since we have only read the header so far. - var frameOffset int64 + var sectionOffset int64 // The Seek call below is equivalent to getting the reader.offset directly. // We get it through Seek to only depend on APIs of a typical io.Seeker. // This would also reduce refactoring in case the utility reader is moved. - if frameOffset, err = reader.Seek(0, io.SeekCurrent); err != nil { + if sectionOffset, err = reader.Seek(0, io.SeekCurrent); err != nil { return nil, err } for { - // Read the frame's length. - frameLen, err := varint.ReadUvarint(reader) + // Read the section's length. + sectionLen, err := varint.ReadUvarint(reader) if err != nil { if err == io.EOF { break @@ -56,11 +61,13 @@ func GenerateIndex(v1r io.Reader) (index.Index, error) { return nil, err } - // Null padding; treat zero-length frames as an EOF. - // They don't contain a CID nor block, so they're not useful. - // TODO: Amend the CARv1 spec to explicitly allow this. - if frameLen == 0 { - break + // Null padding; by default it's an error. + if sectionLen == 0 { + if o.ZeroLegthSectionAsEOF { + break + } else { + return nil, fmt.Errorf("carv1 null padding not allowed by default; see ZeroLegthSectionAsEOF") + } } // Read the CID. @@ -68,12 +75,12 @@ func GenerateIndex(v1r io.Reader) (index.Index, error) { if err != nil { return nil, err } - records = append(records, index.Record{Cid: c, Idx: uint64(frameOffset)}) + records = append(records, index.Record{Cid: c, Idx: uint64(sectionOffset)}) - // Seek to the next frame by skipping the block. - // The frame length includes the CID, so subtract it. - remainingFrameLen := int64(frameLen) - int64(cidLen) - if frameOffset, err = reader.Seek(remainingFrameLen, io.SeekCurrent); err != nil { + // Seek to the next section by skipping the block. + // The section length includes the CID, so subtract it. + remainingSectionLen := int64(sectionLen) - int64(cidLen) + if sectionOffset, err = reader.Seek(remainingSectionLen, io.SeekCurrent); err != nil { return nil, err } } diff --git a/v2/options.go b/v2/options.go new file mode 100644 index 0000000000000000000000000000000000000000..ad859d1a0a5a941f63ad3e7a175024c93d0a7965 --- /dev/null +++ b/v2/options.go @@ -0,0 +1,67 @@ +package car + +// ReadOptions holds the configured options after applying a number of +// ReadOption funcs. +// +// This type should not be used directly by end users; it's only exposed as a +// side effect of ReadOption. +type ReadOptions struct { + ZeroLegthSectionAsEOF bool + + BlockstoreUseWholeCIDs bool +} + +// ReadOption describes an option which affects behavior when parsing CAR files. +type ReadOption func(*ReadOptions) + +func (ReadOption) readWriteOption() {} + +var _ ReadWriteOption = ReadOption(nil) + +// WriteOptions holds the configured options after applying a number of +// WriteOption funcs. +// +// This type should not be used directly by end users; it's only exposed as a +// side effect of WriteOption. +type WriteOptions struct { + CarV1Padding uint64 + IndexPadding uint64 + + BlockstoreAllowDuplicatePuts bool +} + +// WriteOption describes an option which affects behavior when encoding CAR files. +type WriteOption func(*WriteOptions) + +func (WriteOption) readWriteOption() {} + +var _ ReadWriteOption = WriteOption(nil) + +// ReadWriteOption is either a ReadOption or a WriteOption. +type ReadWriteOption interface { + readWriteOption() +} + +// ZeroLegthSectionAsEOF is a read option which allows a CARv1 decoder to treat +// a zero-length section as the end of the input CAR file. For example, this can +// be useful to allow "null padding" after a CARv1 without knowing where the +// padding begins. +func ZeroLegthSectionAsEOF(o *ReadOptions) { + o.ZeroLegthSectionAsEOF = true +} + +// UseCarV1Padding is a write option which sets the padding to be added between +// CAR v2 header and its data payload on Finalize. +func UseCarV1Padding(p uint64) WriteOption { + return func(o *WriteOptions) { + o.CarV1Padding = p + } +} + +// UseIndexPadding is a write option which sets the padding between data payload +// and its index on Finalize. +func UseIndexPadding(p uint64) WriteOption { + return func(o *WriteOptions) { + o.IndexPadding = p + } +} diff --git a/v2/reader.go b/v2/reader.go index ad231c0f1d5b9905518bcb54b4926d77f1e0f44e..f845a81ce7d1c76b635749faf7e1f51c1ca706ac 100644 --- a/v2/reader.go +++ b/v2/reader.go @@ -20,13 +20,13 @@ type Reader struct { } // OpenReader is a wrapper for NewReader which opens the file at path. -func OpenReader(path string) (*Reader, error) { +func OpenReader(path string, opts ...ReadOption) (*Reader, error) { f, err := mmap.Open(path) if err != nil { return nil, err } - r, err := NewReader(f) + r, err := NewReader(f, opts...) if err != nil { return nil, err } @@ -38,7 +38,7 @@ func OpenReader(path string) (*Reader, error) { // NewReader constructs a new reader that reads CAR v2 from the given r. // Upon instantiation, the reader inspects the payload by reading the pragma and will return // an error if the pragma does not represent a CAR v2. -func NewReader(r io.ReaderAt) (*Reader, error) { +func NewReader(r io.ReaderAt, opts ...ReadOption) (*Reader, error) { cr := &Reader{ r: r, }