From d0e44a62f0a3b31f0c6280653381fcdff19f65e5 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Tue, 3 Aug 2021 12:48:08 +0100 Subject: [PATCH] Implement version agnostic streaming CAR block iterator Implement an iterator over CAR blocks that accepts both v1 and v2. The reader only requires io.Reader interface to operate and exposes the version and roots of the underlaying CAR file as fields. Remove `Next` from CARv2 reader now that the iteration functionality is provided using `BlockReader`. Update the benchmarks to reflect the changes and add example usage for `NewBlockReader`. Add tests that assert `BlockReader` behaviour in erroneous and successful cases for both underlying CARv1 and CARv2 payload. --- v2/bench_test.go | 17 +++-- v2/block_reader.go | 142 ++++++++++++++++++++++++++++++++++++ v2/block_reader_test.go | 112 ++++++++++++++++++++++++++++ v2/blockstore/bench_test.go | 24 ++++-- v2/example_test.go | 59 +++++++++++++++ v2/reader.go | 28 +------ v2/reader_test.go | 20 ----- 7 files changed, 339 insertions(+), 63 deletions(-) create mode 100644 v2/block_reader.go create mode 100644 v2/block_reader_test.go diff --git a/v2/bench_test.go b/v2/bench_test.go index c1fe5b6..83adc34 100644 --- a/v2/bench_test.go +++ b/v2/bench_test.go @@ -8,10 +8,9 @@ import ( carv2 "github.com/ipld/go-car/v2" ) -// Open a reader, get the roots, and iterate over all blocks. -// Essentially looking at the contents of any CARv1 or CARv2 file. -// Note that this also uses ReadVersion underneath. - +// BenchmarkReadBlocks instantiates a BlockReader, and iterates over all blocks. +// It essentially looks at the contents of any CARv1 or CARv2 file. +// Note that this also uses internal carv1.ReadHeader underneath. func BenchmarkReadBlocks(b *testing.B) { path := "testdata/sample-wrapped-v2.car" @@ -24,16 +23,16 @@ func BenchmarkReadBlocks(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - cr, err := carv2.OpenReader(path) + r, err := os.Open("testdata/sample-wrapped-v2.car") if err != nil { b.Fatal(err) } - _, err = cr.Roots() + br, err := carv2.NewBlockReader(r) if err != nil { b.Fatal(err) } for { - _, err := cr.Next() + _, err := br.Next() if err == io.EOF { break } @@ -42,7 +41,9 @@ func BenchmarkReadBlocks(b *testing.B) { } } - cr.Close() + if err := r.Close(); err != nil { + b.Fatal(err) + } } }) } diff --git a/v2/block_reader.go b/v2/block_reader.go new file mode 100644 index 0000000..5c33502 --- /dev/null +++ b/v2/block_reader.go @@ -0,0 +1,142 @@ +package car + +import ( + "fmt" + "io" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2/internal/carv1" + "github.com/ipld/go-car/v2/internal/carv1/util" + internalio "github.com/ipld/go-car/v2/internal/io" +) + +// BlockReader facilitates iteration over CAR blocks for both CARv1 and CARv2. +// See NewBlockReader +type BlockReader struct { + // The detected version of the CAR payload. + Version uint64 + // The roots of the CAR payload. May be empty. + Roots []cid.Cid + + // Used internally only, by BlockReader.Next during iteration over blocks. + r io.Reader + ropts ReadOptions +} + +// NewBlockReader instantiates a new BlockReader facilitating iteration over blocks in CARv1 or +// CARv2 payload. Upon instantiation, the version is automatically detected and exposed via +// BlockReader.Version. The root CIDs of the CAR payload are exposed via BlockReader.Roots +// +// See BlockReader.Next +func NewBlockReader(r io.Reader, opts ...ReadOption) (*BlockReader, error) { + // Read CARv1 header or CARv2 pragma. + // Both are a valid CARv1 header, therefore are read as such. + pragmaOrV1Header, err := carv1.ReadHeader(r) + if err != nil { + return nil, err + } + + // Populate the block reader version. + br := &BlockReader{ + Version: pragmaOrV1Header.Version, + } + + // Populate read options + for _, o := range opts { + o(&br.ropts) + } + + // Expect either version 1 or 2. + switch br.Version { + case 1: + // If version is 1, r represents a CARv1. + // Simply populate br.Roots and br.r without modifying r. + br.Roots = pragmaOrV1Header.Roots + br.r = r + case 2: + // If the version is 2: + // 1. Read CARv2 specific header to locate the inner CARv1 data payload offset and size. + // 2. Skip to the beginning of the inner CARv1 data payload. + // 3. Re-initialize r as a LimitReader, limited to the size of the inner CARv1 payload. + // 4. Read the header of inner CARv1 data payload via r to populate br.Roots. + + // Read CARv2-specific header. + v2h := Header{} + if _, err := v2h.ReadFrom(r); err != nil { + return nil, err + } + // Assert the data payload offset validity. + // It must be at least 51 ( + ). + dataOffset := int64(v2h.DataOffset) + if dataOffset < PragmaSize+HeaderSize { + return nil, fmt.Errorf("invalid data payload offset: %v", dataOffset) + } + // Assert the data size validity. + // It must be larger than zero. + // Technically, it should be at least 11 bytes (i.e. a valid CARv1 header with no roots) but + // we let further parsing of the header to signal invalid data payload header. + dataSize := int64(v2h.DataSize) + if dataSize <= 0 { + return nil, fmt.Errorf("invalid data payload size: %v", dataSize) + } + + // Skip to the beginning of inner CARv1 data payload. + // Note, at this point the pragma and CARv1 header have been read. + // An io.ReadSeeker is opportunistically constructed from the given io.Reader r. + // The constructor does not take an initial offset, so we use Seek in io.SeekCurrent to + // fast forward to the beginning of data payload by subtracting pragma and header size from + // dataOffset. + rs := internalio.ToByteReadSeeker(r) + if _, err := rs.Seek(dataOffset-PragmaSize-HeaderSize, io.SeekCurrent); err != nil { + return nil, err + } + + // Set br.r to a LimitReader reading from r limited to dataSize. + br.r = io.LimitReader(r, dataSize) + + // Populate br.Roots by reading the inner CARv1 data payload header. + header, err := carv1.ReadHeader(br.r) + if err != nil { + return nil, err + } + // Assert that the data payload header is exactly 1, i.e. the header represents a CARv1. + if header.Version != 1 { + return nil, fmt.Errorf("invalid data payload header version; expected 1, got %v", header.Version) + } + br.Roots = header.Roots + default: + // Otherwise, error out with invalid version since only versions 1 or 2 are expected. + return nil, fmt.Errorf("invalid car version: %d", br.Version) + } + return br, nil +} + +// Next iterates over blocks in the underlying CAR payload with an io.EOF error indicating the end +// is reached. Note, this function is forward-only; once the end has been reached it will always +// return io.EOF. +// +// When the payload represents a CARv1 the BlockReader.Next simply iterates over blocks until it +// reaches the end of the underlying io.Reader stream. +// +// As for CARv2 payload, the underlying io.Reader is read only up to the end of the last block. +// Note, in a case where ReadOption.ZeroLengthSectionAsEOF is enabled, io.EOF is returned +// immediately upon encountering a zero-length section without reading any further bytes from the +// underlying io.Reader. +func (br *BlockReader) Next() (blocks.Block, error) { + c, data, err := util.ReadNode(br.r, br.ropts.ZeroLengthSectionAsEOF) + if err != nil { + return nil, err + } + + hashed, err := c.Prefix().Sum(data) + if err != nil { + return nil, err + } + + if !hashed.Equals(c) { + return nil, fmt.Errorf("mismatch in content integrity, name: %s, data: %s", c, hashed) + } + + return blocks.NewBlockWithCid(data, c) +} diff --git a/v2/block_reader_test.go b/v2/block_reader_test.go new file mode 100644 index 0000000..afffc80 --- /dev/null +++ b/v2/block_reader_test.go @@ -0,0 +1,112 @@ +package car_test + +import ( + "io" + "os" + "testing" + + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/internal/carv1" + "github.com/stretchr/testify/require" +) + +func TestBlockReaderFailsOnUnknownVersion(t *testing.T) { + r := requireReaderFromPath(t, "testdata/sample-rootless-v42.car") + _, err := carv2.NewBlockReader(r) + require.EqualError(t, err, "invalid car version: 42") +} + +func TestBlockReaderFailsOnCorruptPragma(t *testing.T) { + r := requireReaderFromPath(t, "testdata/sample-corrupt-pragma.car") + _, err := carv2.NewBlockReader(r) + require.EqualError(t, err, "unexpected EOF") +} + +func TestBlockReader_WithCarV1Consistency(t *testing.T) { + tests := []struct { + name string + path string + zerLenAsEOF bool + wantVersion uint64 + }{ + { + name: "CarV1WithoutZeroLengthSection", + path: "testdata/sample-v1.car", + wantVersion: 1, + }, + { + name: "CarV1WithZeroLenSection", + path: "testdata/sample-v1-with-zero-len-section.car", + zerLenAsEOF: true, + wantVersion: 1, + }, + { + name: "AnotherCarV1WithZeroLenSection", + path: "testdata/sample-v1-with-zero-len-section2.car", + zerLenAsEOF: true, + wantVersion: 1, + }, + { + name: "CarV1WithZeroLenSectionWithoutOption", + path: "testdata/sample-v1-with-zero-len-section.car", + wantVersion: 1, + }, + { + name: "AnotherCarV1WithZeroLenSectionWithoutOption", + path: "testdata/sample-v1-with-zero-len-section2.car", + wantVersion: 1, + }, + { + name: "CorruptCarV1", + path: "testdata/sample-v1-tailing-corrupt-section.car", + wantVersion: 1, + }, + { + name: "CarV2WrappingV1", + path: "testdata/sample-wrapped-v2.car", + wantVersion: 2, + }, + { + name: "CarV2ProducedByBlockstore", + path: "testdata/sample-rw-bs-v2.car", + wantVersion: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := requireReaderFromPath(t, tt.path) + subject, err := carv2.NewBlockReader(r, carv2.ZeroLengthSectionAsEOF(tt.zerLenAsEOF)) + require.NoError(t, err) + + require.Equal(t, tt.wantVersion, subject.Version) + + var wantReader *carv1.CarReader + switch tt.wantVersion { + case 1: + wantReader = requireNewCarV1ReaderFromV1File(t, tt.path, tt.zerLenAsEOF) + case 2: + wantReader = requireNewCarV1ReaderFromV2File(t, tt.path, tt.zerLenAsEOF) + default: + require.Failf(t, "invalid test-case", "unknown wantVersion %v", tt.wantVersion) + } + require.Equal(t, wantReader.Header.Roots, subject.Roots) + + for { + gotBlock, gotErr := subject.Next() + wantBlock, wantErr := wantReader.Next() + require.Equal(t, wantBlock, gotBlock) + require.Equal(t, wantErr, gotErr) + if gotErr == io.EOF { + break + } + } + }) + } +} + +func requireReaderFromPath(t *testing.T, path string) io.Reader { + f, err := os.Open(path) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, f.Close()) }) + return f +} diff --git a/v2/blockstore/bench_test.go b/v2/blockstore/bench_test.go index c97dc35..644acbe 100644 --- a/v2/blockstore/bench_test.go +++ b/v2/blockstore/bench_test.go @@ -11,14 +11,21 @@ import ( "github.com/ipld/go-car/v2/blockstore" ) -// Open a read-only blockstore, -// and retrieve all blocks in a shuffled order. +// BenchmarkOpenReadOnlyV1 opens a read-only blockstore, +// and retrieves all blocks in a shuffled order. // Note that this benchmark includes generating an index, // since the input file is a CARv1. - func BenchmarkOpenReadOnlyV1(b *testing.B) { path := "../testdata/sample-v1.car" - + f, err := os.Open("../testdata/sample-v1.car") + if err != nil { + b.Fatal(err) + } + defer func() { + if err := f.Close(); err != nil { + b.Fatal(err) + } + }() info, err := os.Stat(path) if err != nil { b.Fatal(err) @@ -27,12 +34,12 @@ func BenchmarkOpenReadOnlyV1(b *testing.B) { b.ReportAllocs() var shuffledCIDs []cid.Cid - cr, err := carv2.OpenReader(path) + br, err := carv2.NewBlockReader(f) if err != nil { b.Fatal(err) } for { - block, err := cr.Next() + block, err := br.Next() if err == io.EOF { break } @@ -41,7 +48,6 @@ func BenchmarkOpenReadOnlyV1(b *testing.B) { } shuffledCIDs = append(shuffledCIDs, block.Cid()) } - cr.Close() // The shuffling needs to be deterministic, // for the sake of stable benchmark results. @@ -65,7 +71,9 @@ func BenchmarkOpenReadOnlyV1(b *testing.B) { } } - bs.Close() + if err := bs.Close(); err != nil { + b.Fatal(err) + } } }) } diff --git a/v2/example_test.go b/v2/example_test.go index 37bcc22..24223c6 100644 --- a/v2/example_test.go +++ b/v2/example_test.go @@ -3,6 +3,7 @@ package car_test import ( "bytes" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -68,3 +69,61 @@ func ExampleWrapV1File() { // Inner CARv1 is exactly the same: true // [Block bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy] } + +// ExampleNewBlockReader instantiates a new BlockReader for a CARv1 file and its wrapped CARv2 +// version. For each file, it prints the version, the root CIDs and the first five block CIDs. +// Note, the roots and first five block CIDs are identical in both files since both represent the +// same root CIDs and data blocks. +func ExampleNewBlockReader() { + for _, path := range []string{ + "testdata/sample-v1.car", + "testdata/sample-wrapped-v2.car", + } { + fmt.Println("File:", path) + f, err := os.Open(path) + if err != nil { + panic(err) + } + br, err := carv2.NewBlockReader(f) + if err != nil { + panic(err) + } + defer func() { + if err := f.Close(); err != nil { + panic(err) + } + }() + fmt.Println("Version:", br.Version) + fmt.Println("Roots:", br.Roots) + fmt.Println("First 5 block CIDs:") + for i := 0; i < 5; i++ { + bl, err := br.Next() + if err == io.EOF { + break + } + if err != nil { + panic(err) + } + fmt.Printf("\t%v\n", bl.Cid()) + } + } + // Output: + // File: testdata/sample-v1.car + // Version: 1 + // Roots: [bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy] + // First 5 block CIDs: + // bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy + // bafy2bzaceaycv7jhaegckatnncu5yugzkrnzeqsppzegufr35lroxxnsnpspu + // bafy2bzaceb62wdepofqu34afqhbcn4a7jziwblt2ih5hhqqm6zitd3qpzhdp4 + // bafy2bzaceb3utcspm5jqcdqpih3ztbaztv7yunzkiyfq7up7xmokpxemwgu5u + // bafy2bzacedjwekyjresrwjqj4n2r5bnuuu3klncgjo2r3slsp6wgqb37sz4ck + // File: testdata/sample-wrapped-v2.car + // Version: 2 + // Roots: [bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy] + // First 5 block CIDs: + // bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy + // bafy2bzaceaycv7jhaegckatnncu5yugzkrnzeqsppzegufr35lroxxnsnpspu + // bafy2bzaceb62wdepofqu34afqhbcn4a7jziwblt2ih5hhqqm6zitd3qpzhdp4 + // bafy2bzaceb3utcspm5jqcdqpih3ztbaztv7yunzkiyfq7up7xmokpxemwgu5u + // bafy2bzacedjwekyjresrwjqj4n2r5bnuuu3klncgjo2r3slsp6wgqb37sz4ck +} diff --git a/v2/reader.go b/v2/reader.go index b4b4d0d..db8f821 100644 --- a/v2/reader.go +++ b/v2/reader.go @@ -4,8 +4,6 @@ import ( "fmt" "io" - blocks "github.com/ipfs/go-block-format" - internalio "github.com/ipld/go-car/v2/internal/io" "github.com/ipfs/go-cid" @@ -20,11 +18,7 @@ type Reader struct { r io.ReaderAt roots []cid.Cid ropts ReadOptions - // carV1Reader is lazily created, is not reusable, and exclusively used by Reader.Next. - // Note, this reader is forward-only and cannot be rewound. Once it reaches the end of the data - // payload, it will always return io.EOF. - carV1Reader *carv1.CarReader - closer io.Closer + closer io.Closer } // OpenReader is a wrapper for NewReader which opens the file at path. @@ -133,26 +127,6 @@ func (r *Reader) Close() error { return nil } -// Next reads the next block in the data payload with an io.EOF error indicating the end is reached. -// Note, this function is forward-only; once the end has been reached it will always return io.EOF. -func (r *Reader) Next() (blocks.Block, error) { - if r.carV1Reader == nil { - var err error - if r.carV1Reader, err = r.newCarV1Reader(); err != nil { - return nil, err - } - } - return r.carV1Reader.Next() -} - -func (r *Reader) newCarV1Reader() (*carv1.CarReader, error) { - dr := r.DataReader() - if r.ropts.ZeroLengthSectionAsEOF { - return carv1.NewCarReaderWithZeroLengthSectionAsEOF(dr) - } - return carv1.NewCarReader(dr) -} - // ReadVersion reads the version from the pragma. // This function accepts both CARv1 and CARv2 payloads. func ReadVersion(r io.Reader) (uint64, error) { diff --git a/v2/reader_test.go b/v2/reader_test.go index 8b5ec22..79d6f85 100644 --- a/v2/reader_test.go +++ b/v2/reader_test.go @@ -136,16 +136,6 @@ func TestReader_WithCarV1Consistency(t *testing.T) { require.NoError(t, err) require.Equal(t, wantReader.Header.Roots, gotRoots) require.Nil(t, subject.IndexReader()) - - for { - gotBlock, gotErr := subject.Next() - wantBlock, wantErr := wantReader.Next() - require.Equal(t, wantBlock, gotBlock) - require.Equal(t, wantErr, gotErr) - if gotErr == io.EOF { - break - } - } }) } } @@ -184,16 +174,6 @@ func TestReader_WithCarV2Consistency(t *testing.T) { wantIndex, err := carv2.GenerateIndex(subject.DataReader()) require.NoError(t, err) require.Equal(t, wantIndex, gotIndex) - - for { - gotBlock, gotErr := subject.Next() - wantBlock, wantErr := wantReader.Next() - require.Equal(t, wantBlock, gotBlock) - require.Equal(t, wantErr, gotErr) - if gotErr == io.EOF { - break - } - } }) } } -- GitLab