Commit 7d8f54ff authored by Daniel Martí's avatar Daniel Martí

unify options and add more blockstore options

We've agreed to have unified options, since many will be shared between
the root and blockstore packages.

Include docs, and update the tests.
parent f3fc5958
......@@ -24,7 +24,7 @@ import (
var _ blockstore.Blockstore = (*ReadOnly)(nil)
// ReadOnly provides a read-only Car Block Store.
// ReadOnly provides a read-only CAR Block Store.
type ReadOnly struct {
// mu allows ReadWrite to be safe for concurrent use.
// It's in ReadOnly so that read operations also grab read locks,
......@@ -41,6 +41,35 @@ type ReadOnly struct {
// If we called carv2.NewReaderMmap, remember to close it too.
carv2Closer io.Closer
ropts carv2.ReadOptions
}
// UseWholeCIDs is a read option which makes a CAR blockstore identify blocks by
// whole CIDs, and not just their multihashes. The default is to use
// multihashes, which matches the current semantics of go-ipfs-blockstore v1.
//
// Enabling this option affects a number of methods, including read-only ones:
//
// • Get, Has, and HasSize will only return a block
// only if the entire CID is present in the CAR file.
//
// • DeleteBlock will delete a block only when the entire CID matches.
//
// • AllKeysChan will return the original whole CIDs, instead of with their
// multicodec set to "raw" to just provide multihashes.
//
// • If AllowDuplicatePuts isn't set,
// Put and PutMany will deduplicate by the whole CID,
// allowing different CIDs with equal multihashes.
//
// Note that this option only affects the blockstore, and is ignored by the root
// go-car/v2 package.
func UseWholeCIDs(enable bool) carv2.ReadOption {
return func(o *carv2.ReadOptions) {
// TODO: update methods like Get, Has, and AllKeysChan to obey this.
o.BlockstoreUseWholeCIDs = enable
}
}
// NewReadOnly creates a new ReadOnly blockstore from the backing with a optional index as idx.
......@@ -52,7 +81,12 @@ type ReadOnly struct {
// * For a CAR v2 backing an index is only generated if Header.HasIndex returns false.
//
// There is no need to call ReadOnly.Close on instances returned by this function.
func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) {
func NewReadOnly(backing io.ReaderAt, idx index.Index, opts ...carv2.ReadOption) (*ReadOnly, error) {
b := &ReadOnly{}
for _, opt := range opts {
opt(&b.ropts)
}
version, err := readVersion(backing)
if err != nil {
return nil, err
......@@ -60,13 +94,15 @@ func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) {
switch version {
case 1:
if idx == nil {
if idx, err = generateIndex(backing); err != nil {
if idx, err = generateIndex(backing, opts...); err != nil {
return nil, err
}
}
return &ReadOnly{backing: backing, idx: idx}, nil
b.backing = backing
b.idx = idx
return b, nil
case 2:
v2r, err := carv2.NewReader(backing)
v2r, err := carv2.NewReader(backing, opts...)
if err != nil {
return nil, err
}
......@@ -76,11 +112,13 @@ func NewReadOnly(backing io.ReaderAt, idx index.Index) (*ReadOnly, error) {
if err != nil {
return nil, err
}
} else if idx, err = generateIndex(v2r.CarV1Reader()); err != nil {
} else if idx, err = generateIndex(v2r.CarV1Reader(), opts...); err != nil {
return nil, err
}
}
return &ReadOnly{backing: v2r.CarV1Reader(), idx: idx}, nil
b.backing = v2r.CarV1Reader()
b.idx = idx
return b, nil
default:
return nil, fmt.Errorf("unsupported car version: %v", version)
}
......@@ -97,7 +135,7 @@ func readVersion(at io.ReaderAt) (uint64, error) {
return carv2.ReadVersion(rr)
}
func generateIndex(at io.ReaderAt) (index.Index, error) {
func generateIndex(at io.ReaderAt, opts ...carv2.ReadOption) (index.Index, error) {
var rs io.ReadSeeker
switch r := at.(type) {
case io.ReadSeeker:
......@@ -105,19 +143,19 @@ func generateIndex(at io.ReaderAt) (index.Index, error) {
default:
rs = internalio.NewOffsetReadSeeker(r, 0)
}
return carv2.GenerateIndex(rs)
return carv2.GenerateIndex(rs, opts...)
}
// OpenReadOnly opens a read-only blockstore from a CAR file (either v1 or v2), generating an index if it does not exist.
// Note, the generated index if the index does not exist is ephemeral and only stored in memory.
// See car.GenerateIndex and Index.Attach for persisting index onto a CAR file.
func OpenReadOnly(path string) (*ReadOnly, error) {
func OpenReadOnly(path string, opts ...carv2.ReadOption) (*ReadOnly, error) {
f, err := mmap.Open(path)
if err != nil {
return nil, err
}
robs, err := NewReadOnly(f, nil)
robs, err := NewReadOnly(f, nil, opts...)
if err != nil {
return nil, err
}
......@@ -191,7 +229,7 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) {
return -1, err
}
rdr := internalio.NewOffsetReadSeeker(b.backing, int64(idx))
frameLen, err := varint.ReadUvarint(rdr)
sectionLen, err := varint.ReadUvarint(rdr)
if err != nil {
return -1, blockstore.ErrNotFound
}
......@@ -202,7 +240,7 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) {
if !readCid.Equals(key) {
return -1, blockstore.ErrNotFound
}
return int(frameLen) - cidLen, err
return int(sectionLen) - cidLen, err
}
// Put is not supported and always returns an error.
......@@ -249,9 +287,14 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return // TODO: log this error
}
// Null padding; treat it as EOF.
// Null padding; by default it's an error.
if length == 0 {
break // TODO make this an optional behaviour; by default we should error
if b.ropts.ZeroLegthSectionAsEOF {
break
} else {
return // TODO: log this error
// return fmt.Errorf("carv1 null padding not allowed by default; see WithZeroLegthSectionAsEOF")
}
}
thisItemForNxt := rdr.Offset()
......
......@@ -39,33 +39,19 @@ type ReadWrite struct {
idx *insertionIndex
header carv2.Header
dedupCids bool
wopts carv2.WriteOptions
}
// TODO consider exposing interfaces
type Option func(*ReadWrite) // TODO consider unifying with writer options
// WithCarV1Padding sets the padding to be added between CAR v2 header and its data payload on Finalize.
func WithCarV1Padding(p uint64) Option {
return func(b *ReadWrite) {
b.header = b.header.WithCarV1Padding(p)
}
}
// WithIndexPadding sets the padding between data payload and its index on Finalize.
func WithIndexPadding(p uint64) Option {
return func(b *ReadWrite) {
b.header = b.header.WithIndexPadding(p)
}
}
// WithCidDeduplication makes Put calls ignore blocks if the blockstore already
// has the exact same CID.
// This can help avoid redundancy in a CARv1's list of CID-Block pairs.
// AllowDuplicatePuts is a write option which makes a CAR blockstore not
// deduplicate blocks in Put and PutMany. The default is to deduplicate,
// which matches the current semantics of go-ipfs-blockstore v1.
//
// Note that this compares whole CIDs, not just multihashes.
func WithCidDeduplication(b *ReadWrite) { // TODO should this take a bool and return an option to allow disabling dedupliation?
b.dedupCids = true
// Note that this option only affects the blockstore, and is ignored by the root
// go-car/v2 package.
func AllowDuplicatePuts(allow bool) carv2.WriteOption {
return func(o *carv2.WriteOptions) {
o.BlockstoreAllowDuplicatePuts = allow
}
}
// OpenReadWrite creates a new ReadWrite at the given path with a provided set of root CIDs and options.
......@@ -80,7 +66,7 @@ func WithCidDeduplication(b *ReadWrite) { // TODO should this take a bool and re
// header (i.e. the inner CAR v1 header) onto the file before returning.
//
// When the given path already exists, the blockstore will attempt to resume from it.
// On resumption the existing data frames in file are re-indexed, allowing the caller to continue
// On resumption the existing data sections in file are re-indexed, allowing the caller to continue
// putting any remaining blocks without having to re-ingest blocks for which previous ReadWrite.Put
// returned successfully.
//
......@@ -93,7 +79,7 @@ func WithCidDeduplication(b *ReadWrite) { // TODO should this take a bool and re
// 1. start with a complete CAR v2 car.Pragma.
// 2. contain a complete CAR v1 data header with root CIDs matching the CIDs passed to the
// constructor, starting at offset optionally padded by WithCarV1Padding, followed by zero or
// more complete data frames. If any corrupt data frames are present the resumption will fail.
// more complete data sections. If any corrupt data sections are present the resumption will fail.
// Note, if set previously, the blockstore must use the same WithCarV1Padding option as before,
// since this option is used to locate the CAR v1 data payload.
//
......@@ -102,7 +88,7 @@ func WithCidDeduplication(b *ReadWrite) { // TODO should this take a bool and re
//
// Resuming from finalized files is allowed. However, resumption will regenerate the index
// regardless by scanning every existing block in file.
func OpenReadWrite(path string, roots []cid.Cid, opts ...Option) (*ReadWrite, error) {
func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption) (*ReadWrite, error) {
// TODO: enable deduplication by default now that resumption is automatically attempted.
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o666) // TODO: Should the user be able to configure FileMode permissions?
if err != nil {
......@@ -129,13 +115,27 @@ func OpenReadWrite(path string, roots []cid.Cid, opts ...Option) (*ReadWrite, er
idx: newInsertionIndex(),
header: carv2.NewHeader(0),
}
for _, opt := range opts {
opt(rwbs)
switch opt := opt.(type) {
case carv2.ReadOption:
opt(&rwbs.ropts)
case carv2.WriteOption:
opt(&rwbs.wopts)
}
}
if p := rwbs.wopts.CarV1Padding; p > 0 {
rwbs.header = rwbs.header.WithCarV1Padding(p)
}
if p := rwbs.wopts.IndexPadding; p > 0 {
rwbs.header = rwbs.header.WithIndexPadding(p)
}
rwbs.carV1Writer = internalio.NewOffsetWriter(rwbs.f, int64(rwbs.header.CarV1Offset))
v1r := internalio.NewOffsetReadSeeker(rwbs.f, int64(rwbs.header.CarV1Offset))
rwbs.ReadOnly = ReadOnly{backing: v1r, idx: rwbs.idx, carv2Closer: rwbs.f}
rwbs.ReadOnly.backing = v1r
rwbs.ReadOnly.idx = rwbs.idx
rwbs.ReadOnly.carv2Closer = rwbs.f
if resume {
if err = rwbs.resumeWithRoots(roots); err != nil {
......@@ -237,13 +237,13 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error {
if err != nil {
return err
}
frameOffset := int64(0)
if frameOffset, err = v1r.Seek(int64(offset), io.SeekStart); err != nil {
sectionOffset := int64(0)
if sectionOffset, err = v1r.Seek(int64(offset), io.SeekStart); err != nil {
return err
}
for {
// Grab the length of the frame.
// Grab the length of the section.
// Note that ReadUvarint wants a ByteReader.
length, err := varint.ReadUvarint(v1r)
if err != nil {
......@@ -253,10 +253,13 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error {
return err
}
// Null padding; treat zero-length frames as an EOF.
// They don't contain a CID nor block, so they're not useful.
// Null padding; by default it's an error.
if length == 0 {
break // TODO This behaviour should be an option, not default. By default we should error. Hook this up to a write option
if b.ropts.ZeroLegthSectionAsEOF {
break
} else {
return fmt.Errorf("carv1 null padding not allowed by default; see WithZeroLegthSectionAsEOF")
}
}
// Grab the CID.
......@@ -264,16 +267,16 @@ func (b *ReadWrite) resumeWithRoots(roots []cid.Cid) error {
if err != nil {
return err
}
b.idx.insertNoReplace(c, uint64(frameOffset))
b.idx.insertNoReplace(c, uint64(sectionOffset))
// Seek to the next frame by skipping the block.
// The frame length includes the CID, so subtract it.
if frameOffset, err = v1r.Seek(int64(length)-int64(n), io.SeekCurrent); err != nil {
// Seek to the next section by skipping the block.
// The section length includes the CID, so subtract it.
if sectionOffset, err = v1r.Seek(int64(length)-int64(n), io.SeekCurrent); err != nil {
return err
}
}
// Seek to the end of last skipped block where the writer should resume writing.
_, err = b.carV1Writer.Seek(frameOffset, io.SeekStart)
_, err = b.carV1Writer.Seek(sectionOffset, io.SeekStart)
return err
}
......@@ -304,8 +307,17 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error {
for _, bl := range blks {
c := bl.Cid()
if b.dedupCids && b.idx.hasExactCID(c) {
continue
if !b.wopts.BlockstoreAllowDuplicatePuts {
if b.ropts.BlockstoreUseWholeCIDs && b.idx.hasExactCID(c) {
continue // deduplicated by CID
}
if !b.ropts.BlockstoreUseWholeCIDs {
_, err := b.idx.Get(c)
if err == nil {
continue // deduplicated by hash
}
}
}
n := uint64(b.carV1Writer.Position())
......
......@@ -122,15 +122,19 @@ func TestBlockstore(t *testing.T) {
func TestBlockstorePutSameHashes(t *testing.T) {
tdir := t.TempDir()
// wbs allows duplicate puts.
wbs, err := blockstore.OpenReadWrite(
filepath.Join(tdir, "readwrite.car"), nil,
blockstore.AllowDuplicatePuts(true),
)
require.NoError(t, err)
t.Cleanup(func() { wbs.Finalize() })
// wbs deduplicates puts by CID.
wbsd, err := blockstore.OpenReadWrite(
filepath.Join(tdir, "readwrite-dedup.car"), nil,
blockstore.WithCidDeduplication,
blockstore.UseWholeCIDs(true),
)
require.NoError(t, err)
t.Cleanup(func() { wbsd.Finalize() })
......@@ -276,7 +280,7 @@ func TestBlockstoreNullPadding(t *testing.T) {
// A sample null-padded CARv1 file.
paddedV1 = append(paddedV1, make([]byte, 2048)...)
rbs, err := blockstore.NewReadOnly(bufferReaderAt(paddedV1), nil)
rbs, err := blockstore.NewReadOnly(bufferReaderAt(paddedV1), nil, carv2.ZeroLegthSectionAsEOF)
require.NoError(t, err)
roots, err := rbs.Roots()
......@@ -483,8 +487,8 @@ func TestReadWriteWithPaddingWorksAsExpected(t *testing.T) {
subject, err := blockstore.OpenReadWrite(
path,
WantRoots,
blockstore.WithCarV1Padding(wantCarV1Padding),
blockstore.WithIndexPadding(wantIndexPadding))
carv2.UseCarV1Padding(wantCarV1Padding),
carv2.UseIndexPadding(wantIndexPadding))
require.NoError(t, err)
t.Cleanup(func() { subject.Close() })
require.NoError(t, subject.Put(oneTestBlockWithCidV1))
......@@ -544,7 +548,7 @@ func TestReadWriteResumptionFromFileWithDifferentCarV1PaddingIsError(t *testing.
subject, err := blockstore.OpenReadWrite(
path,
WantRoots,
blockstore.WithCarV1Padding(1413))
carv2.UseCarV1Padding(1413))
require.NoError(t, err)
t.Cleanup(func() { subject.Close() })
require.NoError(t, subject.Put(oneTestBlockWithCidV1))
......@@ -553,7 +557,7 @@ func TestReadWriteResumptionFromFileWithDifferentCarV1PaddingIsError(t *testing.
resumingSubject, err := blockstore.OpenReadWrite(
path,
WantRoots,
blockstore.WithCarV1Padding(1314))
carv2.UseCarV1Padding(1314))
require.EqualError(t, err, "cannot resume from file with mismatched CARv1 offset; "+
"`WithCarV1Padding` option must match the padding on file. "+
"Expected padding value of 1413 but got 1314")
......
......@@ -18,7 +18,12 @@ import (
// GenerateIndex generates index for a given car in v1 format.
// The index can be stored using index.Save into a file or serialized using index.WriteTo.
func GenerateIndex(v1r io.Reader) (index.Index, error) {
func GenerateIndex(v1r io.Reader, opts ...ReadOption) (index.Index, error) {
var o ReadOptions
for _, opt := range opts {
opt(&o)
}
reader := internalio.ToByteReadSeeker(v1r)
header, err := carv1.ReadHeader(reader)
if err != nil {
......@@ -35,20 +40,20 @@ func GenerateIndex(v1r io.Reader) (index.Index, error) {
}
records := make([]index.Record, 0)
// Record the start of each frame, with first frame starring from current position in the
// Record the start of each section, with first section starring from current position in the
// reader, i.e. right after the header, since we have only read the header so far.
var frameOffset int64
var sectionOffset int64
// The Seek call below is equivalent to getting the reader.offset directly.
// We get it through Seek to only depend on APIs of a typical io.Seeker.
// This would also reduce refactoring in case the utility reader is moved.
if frameOffset, err = reader.Seek(0, io.SeekCurrent); err != nil {
if sectionOffset, err = reader.Seek(0, io.SeekCurrent); err != nil {
return nil, err
}
for {
// Read the frame's length.
frameLen, err := varint.ReadUvarint(reader)
// Read the section's length.
sectionLen, err := varint.ReadUvarint(reader)
if err != nil {
if err == io.EOF {
break
......@@ -56,11 +61,13 @@ func GenerateIndex(v1r io.Reader) (index.Index, error) {
return nil, err
}
// Null padding; treat zero-length frames as an EOF.
// They don't contain a CID nor block, so they're not useful.
// TODO: Amend the CARv1 spec to explicitly allow this.
if frameLen == 0 {
break
// Null padding; by default it's an error.
if sectionLen == 0 {
if o.ZeroLegthSectionAsEOF {
break
} else {
return nil, fmt.Errorf("carv1 null padding not allowed by default; see ZeroLegthSectionAsEOF")
}
}
// Read the CID.
......@@ -68,12 +75,12 @@ func GenerateIndex(v1r io.Reader) (index.Index, error) {
if err != nil {
return nil, err
}
records = append(records, index.Record{Cid: c, Idx: uint64(frameOffset)})
records = append(records, index.Record{Cid: c, Idx: uint64(sectionOffset)})
// Seek to the next frame by skipping the block.
// The frame length includes the CID, so subtract it.
remainingFrameLen := int64(frameLen) - int64(cidLen)
if frameOffset, err = reader.Seek(remainingFrameLen, io.SeekCurrent); err != nil {
// Seek to the next section by skipping the block.
// The section length includes the CID, so subtract it.
remainingSectionLen := int64(sectionLen) - int64(cidLen)
if sectionOffset, err = reader.Seek(remainingSectionLen, io.SeekCurrent); err != nil {
return nil, err
}
}
......
package car
// ReadOptions holds the configured options after applying a number of
// ReadOption funcs.
//
// This type should not be used directly by end users; it's only exposed as a
// side effect of ReadOption.
type ReadOptions struct {
ZeroLegthSectionAsEOF bool
BlockstoreUseWholeCIDs bool
}
// ReadOption describes an option which affects behavior when parsing CAR files.
type ReadOption func(*ReadOptions)
func (ReadOption) readWriteOption() {}
var _ ReadWriteOption = ReadOption(nil)
// WriteOptions holds the configured options after applying a number of
// WriteOption funcs.
//
// This type should not be used directly by end users; it's only exposed as a
// side effect of WriteOption.
type WriteOptions struct {
CarV1Padding uint64
IndexPadding uint64
BlockstoreAllowDuplicatePuts bool
}
// WriteOption describes an option which affects behavior when encoding CAR files.
type WriteOption func(*WriteOptions)
func (WriteOption) readWriteOption() {}
var _ ReadWriteOption = WriteOption(nil)
// ReadWriteOption is either a ReadOption or a WriteOption.
type ReadWriteOption interface {
readWriteOption()
}
// ZeroLegthSectionAsEOF is a read option which allows a CARv1 decoder to treat
// a zero-length section as the end of the input CAR file. For example, this can
// be useful to allow "null padding" after a CARv1 without knowing where the
// padding begins.
func ZeroLegthSectionAsEOF(o *ReadOptions) {
o.ZeroLegthSectionAsEOF = true
}
// UseCarV1Padding is a write option which sets the padding to be added between
// CAR v2 header and its data payload on Finalize.
func UseCarV1Padding(p uint64) WriteOption {
return func(o *WriteOptions) {
o.CarV1Padding = p
}
}
// UseIndexPadding is a write option which sets the padding between data payload
// and its index on Finalize.
func UseIndexPadding(p uint64) WriteOption {
return func(o *WriteOptions) {
o.IndexPadding = p
}
}
......@@ -20,13 +20,13 @@ type Reader struct {
}
// OpenReader is a wrapper for NewReader which opens the file at path.
func OpenReader(path string) (*Reader, error) {
func OpenReader(path string, opts ...ReadOption) (*Reader, error) {
f, err := mmap.Open(path)
if err != nil {
return nil, err
}
r, err := NewReader(f)
r, err := NewReader(f, opts...)
if err != nil {
return nil, err
}
......@@ -38,7 +38,7 @@ func OpenReader(path string) (*Reader, error) {
// NewReader constructs a new reader that reads CAR v2 from the given r.
// Upon instantiation, the reader inspects the payload by reading the pragma and will return
// an error if the pragma does not represent a CAR v2.
func NewReader(r io.ReaderAt) (*Reader, error) {
func NewReader(r io.ReaderAt, opts ...ReadOption) (*Reader, error) {
cr := &Reader{
r: r,
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment