Commit d0e44a62 authored by Masih H. Derkani's avatar Masih H. Derkani Committed by Masih H. Derkani

Implement version agnostic streaming CAR block iterator

Implement an iterator over CAR blocks that accepts both v1 and v2. The
reader only requires io.Reader interface to operate and exposes the
version and roots of the underlaying CAR file as fields.

Remove `Next` from CARv2 reader now that the iteration functionality is
provided using `BlockReader`.

Update the benchmarks to reflect the changes and add example usage for
`NewBlockReader`.

Add tests that assert `BlockReader` behaviour in erroneous and
successful cases for both underlying CARv1 and CARv2 payload.
parent a6dc547f
......@@ -8,10 +8,9 @@ import (
carv2 "github.com/ipld/go-car/v2"
)
// Open a reader, get the roots, and iterate over all blocks.
// Essentially looking at the contents of any CARv1 or CARv2 file.
// Note that this also uses ReadVersion underneath.
// BenchmarkReadBlocks instantiates a BlockReader, and iterates over all blocks.
// It essentially looks at the contents of any CARv1 or CARv2 file.
// Note that this also uses internal carv1.ReadHeader underneath.
func BenchmarkReadBlocks(b *testing.B) {
path := "testdata/sample-wrapped-v2.car"
......@@ -24,16 +23,16 @@ func BenchmarkReadBlocks(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
cr, err := carv2.OpenReader(path)
r, err := os.Open("testdata/sample-wrapped-v2.car")
if err != nil {
b.Fatal(err)
}
_, err = cr.Roots()
br, err := carv2.NewBlockReader(r)
if err != nil {
b.Fatal(err)
}
for {
_, err := cr.Next()
_, err := br.Next()
if err == io.EOF {
break
}
......@@ -42,7 +41,9 @@ func BenchmarkReadBlocks(b *testing.B) {
}
}
cr.Close()
if err := r.Close(); err != nil {
b.Fatal(err)
}
}
})
}
package car
import (
"fmt"
"io"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2/internal/carv1"
"github.com/ipld/go-car/v2/internal/carv1/util"
internalio "github.com/ipld/go-car/v2/internal/io"
)
// BlockReader facilitates iteration over CAR blocks for both CARv1 and CARv2.
// See NewBlockReader
type BlockReader struct {
// The detected version of the CAR payload.
Version uint64
// The roots of the CAR payload. May be empty.
Roots []cid.Cid
// Used internally only, by BlockReader.Next during iteration over blocks.
r io.Reader
ropts ReadOptions
}
// NewBlockReader instantiates a new BlockReader facilitating iteration over blocks in CARv1 or
// CARv2 payload. Upon instantiation, the version is automatically detected and exposed via
// BlockReader.Version. The root CIDs of the CAR payload are exposed via BlockReader.Roots
//
// See BlockReader.Next
func NewBlockReader(r io.Reader, opts ...ReadOption) (*BlockReader, error) {
// Read CARv1 header or CARv2 pragma.
// Both are a valid CARv1 header, therefore are read as such.
pragmaOrV1Header, err := carv1.ReadHeader(r)
if err != nil {
return nil, err
}
// Populate the block reader version.
br := &BlockReader{
Version: pragmaOrV1Header.Version,
}
// Populate read options
for _, o := range opts {
o(&br.ropts)
}
// Expect either version 1 or 2.
switch br.Version {
case 1:
// If version is 1, r represents a CARv1.
// Simply populate br.Roots and br.r without modifying r.
br.Roots = pragmaOrV1Header.Roots
br.r = r
case 2:
// If the version is 2:
// 1. Read CARv2 specific header to locate the inner CARv1 data payload offset and size.
// 2. Skip to the beginning of the inner CARv1 data payload.
// 3. Re-initialize r as a LimitReader, limited to the size of the inner CARv1 payload.
// 4. Read the header of inner CARv1 data payload via r to populate br.Roots.
// Read CARv2-specific header.
v2h := Header{}
if _, err := v2h.ReadFrom(r); err != nil {
return nil, err
}
// Assert the data payload offset validity.
// It must be at least 51 (<CARv2Pragma> + <CARv2Header>).
dataOffset := int64(v2h.DataOffset)
if dataOffset < PragmaSize+HeaderSize {
return nil, fmt.Errorf("invalid data payload offset: %v", dataOffset)
}
// Assert the data size validity.
// It must be larger than zero.
// Technically, it should be at least 11 bytes (i.e. a valid CARv1 header with no roots) but
// we let further parsing of the header to signal invalid data payload header.
dataSize := int64(v2h.DataSize)
if dataSize <= 0 {
return nil, fmt.Errorf("invalid data payload size: %v", dataSize)
}
// Skip to the beginning of inner CARv1 data payload.
// Note, at this point the pragma and CARv1 header have been read.
// An io.ReadSeeker is opportunistically constructed from the given io.Reader r.
// The constructor does not take an initial offset, so we use Seek in io.SeekCurrent to
// fast forward to the beginning of data payload by subtracting pragma and header size from
// dataOffset.
rs := internalio.ToByteReadSeeker(r)
if _, err := rs.Seek(dataOffset-PragmaSize-HeaderSize, io.SeekCurrent); err != nil {
return nil, err
}
// Set br.r to a LimitReader reading from r limited to dataSize.
br.r = io.LimitReader(r, dataSize)
// Populate br.Roots by reading the inner CARv1 data payload header.
header, err := carv1.ReadHeader(br.r)
if err != nil {
return nil, err
}
// Assert that the data payload header is exactly 1, i.e. the header represents a CARv1.
if header.Version != 1 {
return nil, fmt.Errorf("invalid data payload header version; expected 1, got %v", header.Version)
}
br.Roots = header.Roots
default:
// Otherwise, error out with invalid version since only versions 1 or 2 are expected.
return nil, fmt.Errorf("invalid car version: %d", br.Version)
}
return br, nil
}
// Next iterates over blocks in the underlying CAR payload with an io.EOF error indicating the end
// is reached. Note, this function is forward-only; once the end has been reached it will always
// return io.EOF.
//
// When the payload represents a CARv1 the BlockReader.Next simply iterates over blocks until it
// reaches the end of the underlying io.Reader stream.
//
// As for CARv2 payload, the underlying io.Reader is read only up to the end of the last block.
// Note, in a case where ReadOption.ZeroLengthSectionAsEOF is enabled, io.EOF is returned
// immediately upon encountering a zero-length section without reading any further bytes from the
// underlying io.Reader.
func (br *BlockReader) Next() (blocks.Block, error) {
c, data, err := util.ReadNode(br.r, br.ropts.ZeroLengthSectionAsEOF)
if err != nil {
return nil, err
}
hashed, err := c.Prefix().Sum(data)
if err != nil {
return nil, err
}
if !hashed.Equals(c) {
return nil, fmt.Errorf("mismatch in content integrity, name: %s, data: %s", c, hashed)
}
return blocks.NewBlockWithCid(data, c)
}
package car_test
import (
"io"
"os"
"testing"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/internal/carv1"
"github.com/stretchr/testify/require"
)
func TestBlockReaderFailsOnUnknownVersion(t *testing.T) {
r := requireReaderFromPath(t, "testdata/sample-rootless-v42.car")
_, err := carv2.NewBlockReader(r)
require.EqualError(t, err, "invalid car version: 42")
}
func TestBlockReaderFailsOnCorruptPragma(t *testing.T) {
r := requireReaderFromPath(t, "testdata/sample-corrupt-pragma.car")
_, err := carv2.NewBlockReader(r)
require.EqualError(t, err, "unexpected EOF")
}
func TestBlockReader_WithCarV1Consistency(t *testing.T) {
tests := []struct {
name string
path string
zerLenAsEOF bool
wantVersion uint64
}{
{
name: "CarV1WithoutZeroLengthSection",
path: "testdata/sample-v1.car",
wantVersion: 1,
},
{
name: "CarV1WithZeroLenSection",
path: "testdata/sample-v1-with-zero-len-section.car",
zerLenAsEOF: true,
wantVersion: 1,
},
{
name: "AnotherCarV1WithZeroLenSection",
path: "testdata/sample-v1-with-zero-len-section2.car",
zerLenAsEOF: true,
wantVersion: 1,
},
{
name: "CarV1WithZeroLenSectionWithoutOption",
path: "testdata/sample-v1-with-zero-len-section.car",
wantVersion: 1,
},
{
name: "AnotherCarV1WithZeroLenSectionWithoutOption",
path: "testdata/sample-v1-with-zero-len-section2.car",
wantVersion: 1,
},
{
name: "CorruptCarV1",
path: "testdata/sample-v1-tailing-corrupt-section.car",
wantVersion: 1,
},
{
name: "CarV2WrappingV1",
path: "testdata/sample-wrapped-v2.car",
wantVersion: 2,
},
{
name: "CarV2ProducedByBlockstore",
path: "testdata/sample-rw-bs-v2.car",
wantVersion: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := requireReaderFromPath(t, tt.path)
subject, err := carv2.NewBlockReader(r, carv2.ZeroLengthSectionAsEOF(tt.zerLenAsEOF))
require.NoError(t, err)
require.Equal(t, tt.wantVersion, subject.Version)
var wantReader *carv1.CarReader
switch tt.wantVersion {
case 1:
wantReader = requireNewCarV1ReaderFromV1File(t, tt.path, tt.zerLenAsEOF)
case 2:
wantReader = requireNewCarV1ReaderFromV2File(t, tt.path, tt.zerLenAsEOF)
default:
require.Failf(t, "invalid test-case", "unknown wantVersion %v", tt.wantVersion)
}
require.Equal(t, wantReader.Header.Roots, subject.Roots)
for {
gotBlock, gotErr := subject.Next()
wantBlock, wantErr := wantReader.Next()
require.Equal(t, wantBlock, gotBlock)
require.Equal(t, wantErr, gotErr)
if gotErr == io.EOF {
break
}
}
})
}
}
func requireReaderFromPath(t *testing.T, path string) io.Reader {
f, err := os.Open(path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
return f
}
......@@ -11,14 +11,21 @@ import (
"github.com/ipld/go-car/v2/blockstore"
)
// Open a read-only blockstore,
// and retrieve all blocks in a shuffled order.
// BenchmarkOpenReadOnlyV1 opens a read-only blockstore,
// and retrieves all blocks in a shuffled order.
// Note that this benchmark includes generating an index,
// since the input file is a CARv1.
func BenchmarkOpenReadOnlyV1(b *testing.B) {
path := "../testdata/sample-v1.car"
f, err := os.Open("../testdata/sample-v1.car")
if err != nil {
b.Fatal(err)
}
defer func() {
if err := f.Close(); err != nil {
b.Fatal(err)
}
}()
info, err := os.Stat(path)
if err != nil {
b.Fatal(err)
......@@ -27,12 +34,12 @@ func BenchmarkOpenReadOnlyV1(b *testing.B) {
b.ReportAllocs()
var shuffledCIDs []cid.Cid
cr, err := carv2.OpenReader(path)
br, err := carv2.NewBlockReader(f)
if err != nil {
b.Fatal(err)
}
for {
block, err := cr.Next()
block, err := br.Next()
if err == io.EOF {
break
}
......@@ -41,7 +48,6 @@ func BenchmarkOpenReadOnlyV1(b *testing.B) {
}
shuffledCIDs = append(shuffledCIDs, block.Cid())
}
cr.Close()
// The shuffling needs to be deterministic,
// for the sake of stable benchmark results.
......@@ -65,7 +71,9 @@ func BenchmarkOpenReadOnlyV1(b *testing.B) {
}
}
bs.Close()
if err := bs.Close(); err != nil {
b.Fatal(err)
}
}
})
}
......@@ -3,6 +3,7 @@ package car_test
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
......@@ -68,3 +69,61 @@ func ExampleWrapV1File() {
// Inner CARv1 is exactly the same: true
// [Block bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy] <nil>
}
// ExampleNewBlockReader instantiates a new BlockReader for a CARv1 file and its wrapped CARv2
// version. For each file, it prints the version, the root CIDs and the first five block CIDs.
// Note, the roots and first five block CIDs are identical in both files since both represent the
// same root CIDs and data blocks.
func ExampleNewBlockReader() {
for _, path := range []string{
"testdata/sample-v1.car",
"testdata/sample-wrapped-v2.car",
} {
fmt.Println("File:", path)
f, err := os.Open(path)
if err != nil {
panic(err)
}
br, err := carv2.NewBlockReader(f)
if err != nil {
panic(err)
}
defer func() {
if err := f.Close(); err != nil {
panic(err)
}
}()
fmt.Println("Version:", br.Version)
fmt.Println("Roots:", br.Roots)
fmt.Println("First 5 block CIDs:")
for i := 0; i < 5; i++ {
bl, err := br.Next()
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
fmt.Printf("\t%v\n", bl.Cid())
}
}
// Output:
// File: testdata/sample-v1.car
// Version: 1
// Roots: [bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy]
// First 5 block CIDs:
// bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy
// bafy2bzaceaycv7jhaegckatnncu5yugzkrnzeqsppzegufr35lroxxnsnpspu
// bafy2bzaceb62wdepofqu34afqhbcn4a7jziwblt2ih5hhqqm6zitd3qpzhdp4
// bafy2bzaceb3utcspm5jqcdqpih3ztbaztv7yunzkiyfq7up7xmokpxemwgu5u
// bafy2bzacedjwekyjresrwjqj4n2r5bnuuu3klncgjo2r3slsp6wgqb37sz4ck
// File: testdata/sample-wrapped-v2.car
// Version: 2
// Roots: [bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy]
// First 5 block CIDs:
// bafy2bzaced4ueelaegfs5fqu4tzsh6ywbbpfk3cxppupmxfdhbpbhzawfw5oy
// bafy2bzaceaycv7jhaegckatnncu5yugzkrnzeqsppzegufr35lroxxnsnpspu
// bafy2bzaceb62wdepofqu34afqhbcn4a7jziwblt2ih5hhqqm6zitd3qpzhdp4
// bafy2bzaceb3utcspm5jqcdqpih3ztbaztv7yunzkiyfq7up7xmokpxemwgu5u
// bafy2bzacedjwekyjresrwjqj4n2r5bnuuu3klncgjo2r3slsp6wgqb37sz4ck
}
......@@ -4,8 +4,6 @@ import (
"fmt"
"io"
blocks "github.com/ipfs/go-block-format"
internalio "github.com/ipld/go-car/v2/internal/io"
"github.com/ipfs/go-cid"
......@@ -20,11 +18,7 @@ type Reader struct {
r io.ReaderAt
roots []cid.Cid
ropts ReadOptions
// carV1Reader is lazily created, is not reusable, and exclusively used by Reader.Next.
// Note, this reader is forward-only and cannot be rewound. Once it reaches the end of the data
// payload, it will always return io.EOF.
carV1Reader *carv1.CarReader
closer io.Closer
closer io.Closer
}
// OpenReader is a wrapper for NewReader which opens the file at path.
......@@ -133,26 +127,6 @@ func (r *Reader) Close() error {
return nil
}
// Next reads the next block in the data payload with an io.EOF error indicating the end is reached.
// Note, this function is forward-only; once the end has been reached it will always return io.EOF.
func (r *Reader) Next() (blocks.Block, error) {
if r.carV1Reader == nil {
var err error
if r.carV1Reader, err = r.newCarV1Reader(); err != nil {
return nil, err
}
}
return r.carV1Reader.Next()
}
func (r *Reader) newCarV1Reader() (*carv1.CarReader, error) {
dr := r.DataReader()
if r.ropts.ZeroLengthSectionAsEOF {
return carv1.NewCarReaderWithZeroLengthSectionAsEOF(dr)
}
return carv1.NewCarReader(dr)
}
// ReadVersion reads the version from the pragma.
// This function accepts both CARv1 and CARv2 payloads.
func ReadVersion(r io.Reader) (uint64, error) {
......
......@@ -136,16 +136,6 @@ func TestReader_WithCarV1Consistency(t *testing.T) {
require.NoError(t, err)
require.Equal(t, wantReader.Header.Roots, gotRoots)
require.Nil(t, subject.IndexReader())
for {
gotBlock, gotErr := subject.Next()
wantBlock, wantErr := wantReader.Next()
require.Equal(t, wantBlock, gotBlock)
require.Equal(t, wantErr, gotErr)
if gotErr == io.EOF {
break
}
}
})
}
}
......@@ -184,16 +174,6 @@ func TestReader_WithCarV2Consistency(t *testing.T) {
wantIndex, err := carv2.GenerateIndex(subject.DataReader())
require.NoError(t, err)
require.Equal(t, wantIndex, gotIndex)
for {
gotBlock, gotErr := subject.Next()
wantBlock, wantErr := wantReader.Next()
require.Equal(t, wantBlock, gotBlock)
require.Equal(t, wantErr, gotErr)
if gotErr == io.EOF {
break
}
}
})
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment