Commit d393a837 authored by Masih H. Derkani's avatar Masih H. Derkani Committed by Masih H. Derkani

Implement reader block iterator over CARv1 or CARv2

Implement the mechanism to iterate over blocks in CARv1 or CARv2 via the
`carv2.Reader` API.

Implement tests both for the iterator and the rest of the reader API.
Add test files with corrupt sections etc.

Fixes #189
parent de2711c0
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
"fmt" "fmt"
"io" "io"
blocks "github.com/ipfs/go-block-format"
internalio "github.com/ipld/go-car/v2/internal/io" internalio "github.com/ipld/go-car/v2/internal/io"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
...@@ -13,10 +15,16 @@ import ( ...@@ -13,10 +15,16 @@ import (
// Reader represents a reader of CARv2. // Reader represents a reader of CARv2.
type Reader struct { type Reader struct {
Header Header Header Header
r io.ReaderAt Version uint64
roots []cid.Cid r io.ReaderAt
closer io.Closer roots []cid.Cid
ropts ReadOptions
// carV1Reader is lazily created, is not reusable, and exclusively used by Reader.Next.
// Note, this reader is forward-only and cannot be rewound. Once it reaches the end of the data
// payload, it will always return io.EOF.
carV1Reader *carv1.CarReader
closer io.Closer
} }
// OpenReader is a wrapper for NewReader which opens the file at path. // OpenReader is a wrapper for NewReader which opens the file at path.
...@@ -35,32 +43,39 @@ func OpenReader(path string, opts ...ReadOption) (*Reader, error) { ...@@ -35,32 +43,39 @@ func OpenReader(path string, opts ...ReadOption) (*Reader, error) {
return r, nil return r, nil
} }
// NewReader constructs a new reader that reads CARv2 from the given r. // NewReader constructs a new reader that reads either CARv1 or CARv2 from the given r.
// Upon instantiation, the reader inspects the payload by reading the pragma and will return // Upon instantiation, the reader inspects the payload and provides appropriate read operations
// an error if the pragma does not represent a CARv2. // for both CARv1 and CARv2.
//
// Note that any other version other than 1 or 2 will result in an error. The caller may use
// Reader.Version to get the actual version r represents. In the case where r represents a CARv1
// Reader.Header will not be populated and is left as zero-valued.
func NewReader(r io.ReaderAt, opts ...ReadOption) (*Reader, error) { func NewReader(r io.ReaderAt, opts ...ReadOption) (*Reader, error) {
cr := &Reader{ cr := &Reader{
r: r, r: r,
} }
if err := cr.requireVersion2(); err != nil { for _, o := range opts {
return nil, err o(&cr.ropts)
} }
if err := cr.readHeader(); err != nil {
or := internalio.NewOffsetReadSeeker(r, 0)
var err error
cr.Version, err = ReadVersion(or)
if err != nil {
return nil, err return nil, err
} }
return cr, nil
}
func (r *Reader) requireVersion2() (err error) { if cr.Version != 1 && cr.Version != 2 {
or := internalio.NewOffsetReadSeeker(r.r, 0) return nil, fmt.Errorf("invalid car version: %d", cr.Version)
version, err := ReadVersion(or)
if err != nil {
return
} }
if version != 2 {
return fmt.Errorf("invalid car version: %d", version) if cr.Version == 2 {
if err := cr.readV2Header(); err != nil {
return nil, err
}
} }
return
return cr, nil
} }
// Roots returns the root CIDs. // Roots returns the root CIDs.
...@@ -77,7 +92,7 @@ func (r *Reader) Roots() ([]cid.Cid, error) { ...@@ -77,7 +92,7 @@ func (r *Reader) Roots() ([]cid.Cid, error) {
return r.roots, nil return r.roots, nil
} }
func (r *Reader) readHeader() (err error) { func (r *Reader) readV2Header() (err error) {
headerSection := io.NewSectionReader(r.r, PragmaSize, HeaderSize) headerSection := io.NewSectionReader(r.r, PragmaSize, HeaderSize)
_, err = r.Header.ReadFrom(headerSection) _, err = r.Header.ReadFrom(headerSection)
return return
...@@ -94,12 +109,20 @@ type SectionReader interface { ...@@ -94,12 +109,20 @@ type SectionReader interface {
// DataReader provides a reader containing the data payload in CARv1 format. // DataReader provides a reader containing the data payload in CARv1 format.
func (r *Reader) DataReader() SectionReader { func (r *Reader) DataReader() SectionReader {
return io.NewSectionReader(r.r, int64(r.Header.DataOffset), int64(r.Header.DataSize)) if r.Version == 2 {
return io.NewSectionReader(r.r, int64(r.Header.DataOffset), int64(r.Header.DataSize))
}
return internalio.NewOffsetReadSeeker(r.r, 0)
} }
// IndexReader provides an io.Reader containing the index for the data payload. // IndexReader provides an io.Reader containing the index for the data payload if the index is
// present. Otherwise, returns nil.
// Note, this function will always return nil if the backing payload represents a CARv1.
func (r *Reader) IndexReader() io.Reader { func (r *Reader) IndexReader() io.Reader {
return internalio.NewOffsetReadSeeker(r.r, int64(r.Header.IndexOffset)) if r.Version == 2 {
return internalio.NewOffsetReadSeeker(r.r, int64(r.Header.IndexOffset))
}
return nil
} }
// Close closes the underlying reader if it was opened by OpenReader. // Close closes the underlying reader if it was opened by OpenReader.
...@@ -110,13 +133,32 @@ func (r *Reader) Close() error { ...@@ -110,13 +133,32 @@ func (r *Reader) Close() error {
return nil return nil
} }
// Next reads the next block in the data payload with an io.EOF error indicating the end is reached.
// Note, this function is forward-only; once the end has been reached it will always return io.EOF.
func (r *Reader) Next() (blocks.Block, error) {
if r.carV1Reader == nil {
var err error
if r.carV1Reader, err = r.newCarV1Reader(); err != nil {
return nil, err
}
}
return r.carV1Reader.Next()
}
func (r *Reader) newCarV1Reader() (*carv1.CarReader, error) {
dr := r.DataReader()
if r.ropts.ZeroLengthSectionAsEOF {
return carv1.NewCarReaderWithZeroLengthSectionAsEOF(dr)
}
return carv1.NewCarReader(dr)
}
// ReadVersion reads the version from the pragma. // ReadVersion reads the version from the pragma.
// This function accepts both CARv1 and CARv2 payloads. // This function accepts both CARv1 and CARv2 payloads.
func ReadVersion(r io.Reader) (version uint64, err error) { func ReadVersion(r io.Reader) (uint64, error) {
// TODO if the user provides a reader that sufficiently satisfies what carv1.ReadHeader is asking then use that instead of wrapping every time.
header, err := carv1.ReadHeader(r) header, err := carv1.ReadHeader(r)
if err != nil { if err != nil {
return return 0, err
} }
return header.Version, nil return header.Version, nil
} }
package car_test
import (
"io"
"os"
"testing"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/index"
"github.com/ipld/go-car/v2/internal/carv1"
"github.com/stretchr/testify/require"
)
func TestReadVersion(t *testing.T) {
tests := []struct {
name string
path string
want uint64
wantErr bool
}{
{
name: "CarV1VersionIsOne",
path: "testdata/sample-v1.car",
want: 1,
},
{
name: "CarV2VersionIsTwo",
path: "testdata/sample-rw-bs-v2.car",
want: 2,
},
{
name: "CarV1VersionWithZeroLenSectionIsOne",
path: "testdata/sample-v1-with-zero-len-section.car",
want: 1,
},
{
name: "AnotherCarV1VersionWithZeroLenSectionIsOne",
path: "testdata/sample-v1-with-zero-len-section2.car",
want: 1,
},
{
name: "WrappedCarV1InCarV2VersionIsTwo",
path: "testdata/sample-wrapped-v2.car",
want: 2,
},
{
name: "FutureVersionWithCorrectPragmaIsAsExpected",
path: "testdata/sample-rootless-v42.car",
want: 42,
},
{
name: "CarV1WithValidHeaderButCorruptSectionIsOne",
path: "testdata/sample-v1-tailing-corrupt-section.car",
want: 1,
},
{
name: "CarV2WithValidHeaderButCorruptSectionAndIndexIsTwo",
path: "testdata/sample-v2-corrupt-data-and-index.car",
want: 2,
},
{
name: "CarFileWithCorruptPragmaIsError",
path: "testdata/sample-corrupt-pragma.car",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f, err := os.Open(tt.path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
got, err := carv2.ReadVersion(f)
if tt.wantErr {
require.Error(t, err, "ReadVersion() error = %v, wantErr %v", err, tt.wantErr)
} else {
require.NoError(t, err)
require.Equal(t, tt.want, got, "ReadVersion() got = %v, want %v", got, tt.want)
}
})
}
}
func TestReaderFailsOnUnknownVersion(t *testing.T) {
_, err := carv2.OpenReader("testdata/sample-rootless-v42.car")
require.EqualError(t, err, "invalid car version: 42")
}
func TestReaderFailsOnCorruptPragma(t *testing.T) {
_, err := carv2.OpenReader("testdata/sample-corrupt-pragma.car")
require.EqualError(t, err, "unexpected EOF")
}
func TestReader_WithCarV1Consistency(t *testing.T) {
tests := []struct {
name string
path string
zerLenAsEOF bool
}{
{
name: "CarV1WithoutZeroLengthSection",
path: "testdata/sample-v1.car",
},
{
name: "CarV1WithZeroLenSection",
path: "testdata/sample-v1-with-zero-len-section.car",
zerLenAsEOF: true,
},
{
name: "AnotherCarV1WithZeroLenSection",
path: "testdata/sample-v1-with-zero-len-section2.car",
zerLenAsEOF: true,
},
{
name: "CarV1WithZeroLenSectionWithoutOption",
path: "testdata/sample-v1-with-zero-len-section.car",
},
{
name: "AnotherCarV1WithZeroLenSectionWithoutOption",
path: "testdata/sample-v1-with-zero-len-section2.car",
},
{
name: "CorruptCarV1",
path: "testdata/sample-v1-tailing-corrupt-section.car",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
subject, err := carv2.OpenReader(tt.path, carv2.ZeroLengthSectionAsEOF(tt.zerLenAsEOF))
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, subject.Close()) })
wantReader := requireNewCarV1ReaderFromV1File(t, tt.path, tt.zerLenAsEOF)
require.Equal(t, uint64(1), subject.Version)
gotRoots, err := subject.Roots()
require.NoError(t, err)
require.Equal(t, wantReader.Header.Roots, gotRoots)
require.Nil(t, subject.IndexReader())
for {
gotBlock, gotErr := subject.Next()
wantBlock, wantErr := wantReader.Next()
require.Equal(t, wantBlock, gotBlock)
require.Equal(t, wantErr, gotErr)
if gotErr == io.EOF {
break
}
}
})
}
}
func TestReader_WithCarV2Consistency(t *testing.T) {
tests := []struct {
name string
path string
zerLenAsEOF bool
}{
{
name: "CarV2WrappingV1",
path: "testdata/sample-wrapped-v2.car",
},
{
name: "CarV2ProducedByBlockstore",
path: "testdata/sample-rw-bs-v2.car",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
subject, err := carv2.OpenReader(tt.path, carv2.ZeroLengthSectionAsEOF(tt.zerLenAsEOF))
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, subject.Close()) })
wantReader := requireNewCarV1ReaderFromV2File(t, tt.path, tt.zerLenAsEOF)
require.Equal(t, uint64(2), subject.Version)
gotRoots, err := subject.Roots()
require.NoError(t, err)
require.Equal(t, wantReader.Header.Roots, gotRoots)
gotIndexReader := subject.IndexReader()
require.NotNil(t, gotIndexReader)
gotIndex, err := index.ReadFrom(gotIndexReader)
require.NoError(t, err)
wantIndex, err := carv2.GenerateIndex(subject.DataReader())
require.NoError(t, err)
require.Equal(t, wantIndex, gotIndex)
for {
gotBlock, gotErr := subject.Next()
wantBlock, wantErr := wantReader.Next()
require.Equal(t, wantBlock, gotBlock)
require.Equal(t, wantErr, gotErr)
if gotErr == io.EOF {
break
}
}
})
}
}
func requireNewCarV1ReaderFromV2File(t *testing.T, carV12Path string, zerLenAsEOF bool) *carv1.CarReader {
f, err := os.Open(carV12Path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
_, err = f.Seek(carv2.PragmaSize, io.SeekStart)
require.NoError(t, err)
header := carv2.Header{}
_, err = header.ReadFrom(f)
require.NoError(t, err)
return requireNewCarV1Reader(t, io.NewSectionReader(f, int64(header.DataOffset), int64(header.DataSize)), zerLenAsEOF)
}
func requireNewCarV1ReaderFromV1File(t *testing.T, carV1Path string, zerLenAsEOF bool) *carv1.CarReader {
f, err := os.Open(carV1Path)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
return requireNewCarV1Reader(t, f, zerLenAsEOF)
}
func requireNewCarV1Reader(t *testing.T, r io.Reader, zerLenAsEOF bool) *carv1.CarReader {
var cr *carv1.CarReader
var err error
if zerLenAsEOF {
cr, err = carv1.NewCarReaderWithZeroLengthSectionAsEOF(r)
} else {
cr, err = carv1.NewCarReader(r)
}
require.NoError(t, err)
return cr
}
erootsgversio
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment