diff --git a/v2/bench_test.go b/v2/bench_test.go index c1fe5b624270a171cc08df375e70c8b629453fc9..83adc346a90ecb72982d03ae3f509c9676e10276 100644 --- a/v2/bench_test.go +++ b/v2/bench_test.go @@ -8,10 +8,9 @@ import ( carv2 "github.com/ipld/go-car/v2" ) -// Open a reader, get the roots, and iterate over all blocks. -// Essentially looking at the contents of any CARv1 or CARv2 file. -// Note that this also uses ReadVersion underneath. - +// BenchmarkReadBlocks instantiates a BlockReader, and iterates over all blocks. +// It essentially looks at the contents of any CARv1 or CARv2 file. +// Note that this also uses internal carv1.ReadHeader underneath. func BenchmarkReadBlocks(b *testing.B) { path := "testdata/sample-wrapped-v2.car" @@ -24,16 +23,16 @@ func BenchmarkReadBlocks(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - cr, err := carv2.OpenReader(path) + r, err := os.Open("testdata/sample-wrapped-v2.car") if err != nil { b.Fatal(err) } - _, err = cr.Roots() + br, err := carv2.NewBlockReader(r) if err != nil { b.Fatal(err) } for { - _, err := cr.Next() + _, err := br.Next() if err == io.EOF { break } @@ -42,7 +41,9 @@ func BenchmarkReadBlocks(b *testing.B) { } } - cr.Close() + if err := r.Close(); err != nil { + b.Fatal(err) + } } }) } diff --git a/v2/block_reader.go b/v2/block_reader.go new file mode 100644 index 0000000000000000000000000000000000000000..5c33502945a1e6d514d22674f19dc5f33f323377 --- /dev/null +++ b/v2/block_reader.go @@ -0,0 +1,142 @@ +package car + +import ( + "fmt" + "io" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2/internal/carv1" + "github.com/ipld/go-car/v2/internal/carv1/util" + internalio "github.com/ipld/go-car/v2/internal/io" +) + +// BlockReader facilitates iteration over CAR blocks for both CARv1 and CARv2. +// See NewBlockReader +type BlockReader struct { + // The detected version of the CAR payload. + Version uint64 + // The roots of the CAR payload. May be empty. + Roots []cid.Cid + + // Used internally only, by BlockReader.Next during iteration over blocks. + r io.Reader + ropts ReadOptions +} + +// NewBlockReader instantiates a new BlockReader facilitating iteration over blocks in CARv1 or +// CARv2 payload. Upon instantiation, the version is automatically detected and exposed via +// BlockReader.Version. The root CIDs of the CAR payload are exposed via BlockReader.Roots +// +// See BlockReader.Next +func NewBlockReader(r io.Reader, opts ...ReadOption) (*BlockReader, error) { + // Read CARv1 header or CARv2 pragma. + // Both are a valid CARv1 header, therefore are read as such. + pragmaOrV1Header, err := carv1.ReadHeader(r) + if err != nil { + return nil, err + } + + // Populate the block reader version. + br := &BlockReader{ + Version: pragmaOrV1Header.Version, + } + + // Populate read options + for _, o := range opts { + o(&br.ropts) + } + + // Expect either version 1 or 2. + switch br.Version { + case 1: + // If version is 1, r represents a CARv1. + // Simply populate br.Roots and br.r without modifying r. + br.Roots = pragmaOrV1Header.Roots + br.r = r + case 2: + // If the version is 2: + // 1. Read CARv2 specific header to locate the inner CARv1 data payload offset and size. + // 2. Skip to the beginning of the inner CARv1 data payload. + // 3. Re-initialize r as a LimitReader, limited to the size of the inner CARv1 payload. + // 4. Read the header of inner CARv1 data payload via r to populate br.Roots. + + // Read CARv2-specific header. + v2h := Header{} + if _, err := v2h.ReadFrom(r); err != nil { + return nil, err + } + // Assert the data payload offset validity. + // It must be at least 51 ( + ). + dataOffset := int64(v2h.DataOffset) + if dataOffset < PragmaSize+HeaderSize { + return nil, fmt.Errorf("invalid data payload offset: %v", dataOffset) + } + // Assert the data size validity. + // It must be larger than zero. + // Technically, it should be at least 11 bytes (i.e. a valid CARv1 header with no roots) but + // we let further parsing of the header to signal invalid data payload header. + dataSize := int64(v2h.DataSize) + if dataSize <= 0 { + return nil, fmt.Errorf("invalid data payload size: %v", dataSize) + } + + // Skip to the beginning of inner CARv1 data payload. + // Note, at this point the pragma and CARv1 header have been read. + // An io.ReadSeeker is opportunistically constructed from the given io.Reader r. + // The constructor does not take an initial offset, so we use Seek in io.SeekCurrent to + // fast forward to the beginning of data payload by subtracting pragma and header size from + // dataOffset. + rs := internalio.ToByteReadSeeker(r) + if _, err := rs.Seek(dataOffset-PragmaSize-HeaderSize, io.SeekCurrent); err != nil { + return nil, err + } + + // Set br.r to a LimitReader reading from r limited to dataSize. + br.r = io.LimitReader(r, dataSize) + + // Populate br.Roots by reading the inner CARv1 data payload header. + header, err := carv1.ReadHeader(br.r) + if err != nil { + return nil, err + } + // Assert that the data payload header is exactly 1, i.e. the header represents a CARv1. + if header.Version != 1 { + return nil, fmt.Errorf("invalid data payload header version; expected 1, got %v", header.Version) + } + br.Roots = header.Roots + default: + // Otherwise, error out with invalid version since only versions 1 or 2 are expected. + return nil, fmt.Errorf("invalid car version: %d", br.Version) + } + return br, nil +} + +// Next iterates over blocks in the underlying CAR payload with an io.EOF error indicating the end +// is reached. Note, this function is forward-only; once the end has been reached it will always +// return io.EOF. +// +// When the payload represents a CARv1 the BlockReader.Next simply iterates over blocks until it +// reaches the end of the underlying io.Reader stream. +// +// As for CARv2 payload, the underlying io.Reader is read only up to the end of the last block. +// Note, in a case where ReadOption.ZeroLengthSectionAsEOF is enabled, io.EOF is returned +// immediately upon encountering a zero-length section without reading any further bytes from the +// underlying io.Reader. +func (br *BlockReader) Next() (blocks.Block, error) { + c, data, err := util.ReadNode(br.r, br.ropts.ZeroLengthSectionAsEOF) + if err != nil { + return nil, err + } + + hashed, err := c.Prefix().Sum(data) + if err != nil { + return nil, err + } + + if !hashed.Equals(c) { + return nil, fmt.Errorf("mismatch in content integrity, name: %s, data: %s", c, hashed) + } + + return blocks.NewBlockWithCid(data, c) +} diff --git a/v2/block_reader_test.go b/v2/block_reader_test.go new file mode 100644 index 0000000000000000000000000000000000000000..afffc806cd3a8683df9e13e636264056a29ace02 --- /dev/null +++ b/v2/block_reader_test.go @@ -0,0 +1,112 @@ +package car_test + +import ( + "io" + "os" + "testing" + + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/internal/carv1" + "github.com/stretchr/testify/require" +) + +func TestBlockReaderFailsOnUnknownVersion(t *testing.T) { + r := requireReaderFromPath(t, "testdata/sample-rootless-v42.car") + _, err := carv2.NewBlockReader(r) + require.EqualError(t, err, "invalid car version: 42") +} + +func TestBlockReaderFailsOnCorruptPragma(t *testing.T) { + r := requireReaderFromPath(t, "testdata/sample-corrupt-pragma.car") + _, err := carv2.NewBlockReader(r) + require.EqualError(t, err, "unexpected EOF") +} + +func TestBlockReader_WithCarV1Consistency(t *testing.T) { + tests := []struct { + name string + path string + zerLenAsEOF bool + wantVersion uint64 + }{ + { + name: "CarV1WithoutZeroLengthSection", + path: "testdata/sample-v1.car", + wantVersion: 1, + }, + { + name: "CarV1WithZeroLenSection", + path: "testdata/sample-v1-with-zero-len-section.car", + zerLenAsEOF: true, + wantVersion: 1, + }, + { + name: "AnotherCarV1WithZeroLenSection", + path: "testdata/sample-v1-with-zero-len-section2.car", + zerLenAsEOF: true, + wantVersion: 1, + }, + { + name: "CarV1WithZeroLenSectionWithoutOption", + path: "testdata/sample-v1-with-zero-len-section.car", + wantVersion: 1, + }, + { + name: "AnotherCarV1WithZeroLenSectionWithoutOption", + path: "testdata/sample-v1-with-zero-len-section2.car", + wantVersion: 1, + }, + { + name: "CorruptCarV1", + path: "testdata/sample-v1-tailing-corrupt-section.car", + wantVersion: 1, + }, + { + name: "CarV2WrappingV1", + path: "testdata/sample-wrapped-v2.car", + wantVersion: 2, + }, + { + name: "CarV2ProducedByBlockstore", + path: "testdata/sample-rw-bs-v2.car", + wantVersion: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := requireReaderFromPath(t, tt.path) + subject, err := carv2.NewBlockReader(r, carv2.ZeroLengthSectionAsEOF(tt.zerLenAsEOF)) + require.NoError(t, err) + + require.Equal(t, tt.wantVersion, subject.Version) + + var wantReader *carv1.CarReader + switch tt.wantVersion { + case 1: + wantReader = requireNewCarV1ReaderFromV1File(t, tt.path, tt.zerLenAsEOF) + case 2: + wantReader = requireNewCarV1ReaderFromV2File(t, tt.path, tt.zerLenAsEOF) + default: + require.Failf(t, "invalid test-case", "unknown wantVersion %v", tt.wantVersion) + } + require.Equal(t, wantReader.Header.Roots, subject.Roots) + + for { + gotBlock, gotErr := subject.Next() + wantBlock, wantErr := wantReader.Next() + require.Equal(t, wantBlock, gotBlock) + require.Equal(t, wantErr, gotErr) + if gotErr == io.EOF { + break + } + } + }) + } +} + +func requireReaderFromPath(t *testing.T, path string) io.Reader { + f, err := os.Open(path) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, f.Close()) }) + return f +} diff --git a/v2/blockstore/bench_test.go b/v2/blockstore/bench_test.go index c97dc3526767e59339bda26f306ce0bc4c29c897..644acbe95c4f60a632a8327a4d98f63f347144c7 100644 --- a/v2/blockstore/bench_test.go +++ b/v2/blockstore/bench_test.go @@ -11,14 +11,21 @@ import ( "github.com/ipld/go-car/v2/blockstore" ) -// Open a read-only blockstore, -// and retrieve all blocks in a shuffled order. +// BenchmarkOpenReadOnlyV1 opens a read-only blockstore, +// and retrieves all blocks in a shuffled order. // Note that this benchmark includes generating an index, // since the input file is a CARv1. - func BenchmarkOpenReadOnlyV1(b *testing.B) { path := "../testdata/sample-v1.car" - + f, err := os.Open("../testdata/sample-v1.car") + if err != nil { + b.Fatal(err) + } + defer func() { + if err := f.Close(); err != nil { + b.Fatal(err) + } + }() info, err := os.Stat(path) if err != nil { b.Fatal(err) @@ -27,12 +34,12 @@ func BenchmarkOpenReadOnlyV1(b *testing.B) { b.ReportAllocs() var shuffledCIDs []cid.Cid - cr, err := carv2.OpenReader(path) + br, err := carv2.NewBlockReader(f) if err != nil { b.Fatal(err) } for { - block, err := cr.Next() + block, err := br.Next() if err == io.EOF { break } @@ -41,7 +48,6 @@ func BenchmarkOpenReadOnlyV1(b *testing.B) { } shuffledCIDs = append(shuffledCIDs, block.Cid()) } - cr.Close() // The shuffling needs to be deterministic, // for the sake of stable benchmark results. @@ -65,7 +71,9 @@ func BenchmarkOpenReadOnlyV1(b *testing.B) { } } - bs.Close() + if err := bs.Close(); err != nil { + b.Fatal(err) + } } }) } diff --git a/v2/example_test.go b/v2/example_test.go index 37bcc22b534fca7c472018fa45f01435b2703322..24223c634db3bfdaf90b7917ce09f1cf97716152 100644 --- a/v2/example_test.go +++ b/v2/example_test.go @@ -3,6 +3,7 @@ package car_test import ( "bytes" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -68,3 +69,61 @@ func ExampleWrapV1File() { // Inner CARv1 is exactly the same: true // [Block bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy] } + +// ExampleNewBlockReader instantiates a new BlockReader for a CARv1 file and its wrapped CARv2 +// version. For each file, it prints the version, the root CIDs and the first five block CIDs. +// Note, the roots and first five block CIDs are identical in both files since both represent the +// same root CIDs and data blocks. +func ExampleNewBlockReader() { + for _, path := range []string{ + "testdata/sample-v1.car", + "testdata/sample-wrapped-v2.car", + } { + fmt.Println("File:", path) + f, err := os.Open(path) + if err != nil { + panic(err) + } + br, err := carv2.NewBlockReader(f) + if err != nil { + panic(err) + } + defer func() { + if err := f.Close(); err != nil { + panic(err) + } + }() + fmt.Println("Version:", br.Version) + fmt.Println("Roots:", br.Roots) + fmt.Println("First 5 block CIDs:") + for i := 0; i < 5; i++ { + bl, err := br.Next() + if err == io.EOF { + break + } + if err != nil { + panic(err) + } + fmt.Printf("\t%v\n", bl.Cid()) + } + } + // Output: + // File: testdata/sample-v1.car + // Version: 1 + // Roots: [bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy] + // First 5 block CIDs: + // bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy + // bafy2bzaceaycv7jhaegckatnncu5yugzkrnzeqsppzegufr35lroxxnsnpspu + // bafy2bzaceb62wdepofqu34afqhbcn4a7jziwblt2ih5hhqqm6zitd3qpzhdp4 + // bafy2bzaceb3utcspm5jqcdqpih3ztbaztv7yunzkiyfq7up7xmokpxemwgu5u + // bafy2bzacedjwekyjresrwjqj4n2r5bnuuu3klncgjo2r3slsp6wgqb37sz4ck + // File: testdata/sample-wrapped-v2.car + // Version: 2 + // Roots: [bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy] + // First 5 block CIDs: + // bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy + // bafy2bzaceaycv7jhaegckatnncu5yugzkrnzeqsppzegufr35lroxxnsnpspu + // bafy2bzaceb62wdepofqu34afqhbcn4a7jziwblt2ih5hhqqm6zitd3qpzhdp4 + // bafy2bzaceb3utcspm5jqcdqpih3ztbaztv7yunzkiyfq7up7xmokpxemwgu5u + // bafy2bzacedjwekyjresrwjqj4n2r5bnuuu3klncgjo2r3slsp6wgqb37sz4ck +} diff --git a/v2/reader.go b/v2/reader.go index b4b4d0d45b64df64187957a7da0389d91bb474ec..db8f821051fc3bf66afd6528aec5cdec49a10c90 100644 --- a/v2/reader.go +++ b/v2/reader.go @@ -4,8 +4,6 @@ import ( "fmt" "io" - blocks "github.com/ipfs/go-block-format" - internalio "github.com/ipld/go-car/v2/internal/io" "github.com/ipfs/go-cid" @@ -20,11 +18,7 @@ type Reader struct { r io.ReaderAt roots []cid.Cid ropts ReadOptions - // carV1Reader is lazily created, is not reusable, and exclusively used by Reader.Next. - // Note, this reader is forward-only and cannot be rewound. Once it reaches the end of the data - // payload, it will always return io.EOF. - carV1Reader *carv1.CarReader - closer io.Closer + closer io.Closer } // OpenReader is a wrapper for NewReader which opens the file at path. @@ -133,26 +127,6 @@ func (r *Reader) Close() error { return nil } -// Next reads the next block in the data payload with an io.EOF error indicating the end is reached. -// Note, this function is forward-only; once the end has been reached it will always return io.EOF. -func (r *Reader) Next() (blocks.Block, error) { - if r.carV1Reader == nil { - var err error - if r.carV1Reader, err = r.newCarV1Reader(); err != nil { - return nil, err - } - } - return r.carV1Reader.Next() -} - -func (r *Reader) newCarV1Reader() (*carv1.CarReader, error) { - dr := r.DataReader() - if r.ropts.ZeroLengthSectionAsEOF { - return carv1.NewCarReaderWithZeroLengthSectionAsEOF(dr) - } - return carv1.NewCarReader(dr) -} - // ReadVersion reads the version from the pragma. // This function accepts both CARv1 and CARv2 payloads. func ReadVersion(r io.Reader) (uint64, error) { diff --git a/v2/reader_test.go b/v2/reader_test.go index 8b5ec228f6a664521a0f581ab8eba07cc32164e9..79d6f855d2cd9b0ae6bb3d3bcf6bfa663f5516c3 100644 --- a/v2/reader_test.go +++ b/v2/reader_test.go @@ -136,16 +136,6 @@ func TestReader_WithCarV1Consistency(t *testing.T) { require.NoError(t, err) require.Equal(t, wantReader.Header.Roots, gotRoots) require.Nil(t, subject.IndexReader()) - - for { - gotBlock, gotErr := subject.Next() - wantBlock, wantErr := wantReader.Next() - require.Equal(t, wantBlock, gotBlock) - require.Equal(t, wantErr, gotErr) - if gotErr == io.EOF { - break - } - } }) } } @@ -184,16 +174,6 @@ func TestReader_WithCarV2Consistency(t *testing.T) { wantIndex, err := carv2.GenerateIndex(subject.DataReader()) require.NoError(t, err) require.Equal(t, wantIndex, gotIndex) - - for { - gotBlock, gotErr := subject.Next() - wantBlock, wantErr := wantReader.Next() - require.Equal(t, wantBlock, gotBlock) - require.Equal(t, wantErr, gotErr) - if gotErr == io.EOF { - break - } - } }) } }