Commit 703b88c2 authored by Masih H. Derkani's avatar Masih H. Derkani Committed by Masih H. Derkani

Add zero-length sections as EOF option to internal CARv1 reader

The CARv2 implementation uses an internal fork of CARv1 due to upstream
dependency issues captured in #104. Propagate the options set in CARv2
APIs for treatment of zero-lenth sections onto internal packages so that
APIs using the internal CARv1 reader behave consistently. Use a `bool`
for setting the option, since it is the only option needed in CARv1.

Update tests to reflect changes.

Add additional tests to internal CARv1 package and ReadOnly blockstore
that assert option is propagated.

Fixes #190
parent ffcc4b77
...@@ -162,7 +162,7 @@ func OpenReadOnly(path string, opts ...carv2.ReadOption) (*ReadOnly, error) { ...@@ -162,7 +162,7 @@ func OpenReadOnly(path string, opts ...carv2.ReadOption) (*ReadOnly, error) {
} }
func (b *ReadOnly) readBlock(idx int64) (cid.Cid, []byte, error) { func (b *ReadOnly) readBlock(idx int64) (cid.Cid, []byte, error) {
bcid, data, err := util.ReadNode(internalio.NewOffsetReadSeeker(b.backing, idx)) bcid, data, err := util.ReadNode(internalio.NewOffsetReadSeeker(b.backing, idx), b.ropts.ZeroLengthSectionAsEOF)
return bcid, data, err return bcid, data, err
} }
...@@ -252,7 +252,7 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) { ...@@ -252,7 +252,7 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) {
b.mu.RLock() b.mu.RLock()
defer b.mu.RUnlock() defer b.mu.RUnlock()
var fnSize int = -1 fnSize := -1
var fnErr error var fnErr error
err := b.idx.GetAll(key, func(offset uint64) bool { err := b.idx.GetAll(key, func(offset uint64) bool {
rdr := internalio.NewOffsetReadSeeker(b.backing, int64(offset)) rdr := internalio.NewOffsetReadSeeker(b.backing, int64(offset))
......
...@@ -34,25 +34,38 @@ func TestReadOnly(t *testing.T) { ...@@ -34,25 +34,38 @@ func TestReadOnly(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
v1OrV2path string v1OrV2path string
opts []carv2.ReadOption
v1r *carv1.CarReader v1r *carv1.CarReader
}{ }{
{ {
"OpenedWithCarV1", "OpenedWithCarV1",
"../testdata/sample-v1.car", "../testdata/sample-v1.car",
newReaderFromV1File(t, "../testdata/sample-v1.car"), []carv2.ReadOption{UseWholeCIDs(true)},
newV1ReaderFromV1File(t, "../testdata/sample-v1.car", false),
}, },
{ {
"OpenedWithCarV2", "OpenedWithCarV2",
"../testdata/sample-wrapped-v2.car", "../testdata/sample-wrapped-v2.car",
newReaderFromV2File(t, "../testdata/sample-wrapped-v2.car"), []carv2.ReadOption{UseWholeCIDs(true)},
newV1ReaderFromV2File(t, "../testdata/sample-wrapped-v2.car", false),
},
{
"OpenedWithCarV1ZeroLenSection",
"../testdata/sample-v1-with-zero-len-section.car",
[]carv2.ReadOption{UseWholeCIDs(true), carv2.ZeroLengthSectionAsEOF(true)},
newV1ReaderFromV1File(t, "../testdata/sample-v1-with-zero-len-section.car", true),
},
{
"OpenedWithAnotherCarV1ZeroLenSection",
"../testdata/sample-v1-with-zero-len-section2.car",
[]carv2.ReadOption{UseWholeCIDs(true), carv2.ZeroLengthSectionAsEOF(true)},
newV1ReaderFromV1File(t, "../testdata/sample-v1-with-zero-len-section2.car", true),
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
subject, err := OpenReadOnly(tt.v1OrV2path, subject, err := OpenReadOnly(tt.v1OrV2path, tt.opts...)
UseWholeCIDs(true), t.Cleanup(func() { require.NoError(t, subject.Close()) })
)
t.Cleanup(func() { subject.Close() })
require.NoError(t, err) require.NoError(t, err)
// Assert roots match v1 payload. // Assert roots match v1 payload.
...@@ -118,22 +131,29 @@ func TestNewReadOnlyFailsOnUnknownVersion(t *testing.T) { ...@@ -118,22 +131,29 @@ func TestNewReadOnlyFailsOnUnknownVersion(t *testing.T) {
require.Nil(t, subject) require.Nil(t, subject)
} }
func newReaderFromV1File(t *testing.T, carv1Path string) *carv1.CarReader { func newV1ReaderFromV1File(t *testing.T, carv1Path string, zeroLenSectionAsEOF bool) *carv1.CarReader {
f, err := os.Open(carv1Path) f, err := os.Open(carv1Path)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { f.Close() }) t.Cleanup(func() { f.Close() })
v1r, err := carv1.NewCarReader(f) v1r, err := newV1Reader(f, zeroLenSectionAsEOF)
require.NoError(t, err) require.NoError(t, err)
return v1r return v1r
} }
func newReaderFromV2File(t *testing.T, carv2Path string) *carv1.CarReader { func newV1ReaderFromV2File(t *testing.T, carv2Path string, zeroLenSectionAsEOF bool) *carv1.CarReader {
f, err := os.Open(carv2Path) f, err := os.Open(carv2Path)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { f.Close() }) t.Cleanup(func() { f.Close() })
v2r, err := carv2.NewReader(f) v2r, err := carv2.NewReader(f)
require.NoError(t, err) require.NoError(t, err)
v1r, err := carv1.NewCarReader(v2r.DataReader()) v1r, err := newV1Reader(v2r.DataReader(), zeroLenSectionAsEOF)
require.NoError(t, err) require.NoError(t, err)
return v1r return v1r
} }
func newV1Reader(r io.Reader, zeroLenSectionAsEOF bool) (*carv1.CarReader, error) {
if zeroLenSectionAsEOF {
return carv1.NewCarReaderWithZeroLengthSectionAsEOF(r)
}
return carv1.NewCarReader(r)
}
...@@ -77,7 +77,7 @@ func TestReadFrom(t *testing.T) { ...@@ -77,7 +77,7 @@ func TestReadFrom(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Read the fame at offset and assert the frame corresponds to the expected block. // Read the fame at offset and assert the frame corresponds to the expected block.
gotCid, gotData, err := util.ReadNode(crf) gotCid, gotData, err := util.ReadNode(crf, false)
require.NoError(t, err) require.NoError(t, err)
gotBlock, err := blocks.NewBlockWithCid(gotData, gotCid) gotBlock, err := blocks.NewBlockWithCid(gotData, gotCid)
require.NoError(t, err) require.NoError(t, err)
......
...@@ -57,7 +57,7 @@ func WriteCar(ctx context.Context, ds format.NodeGetter, roots []cid.Cid, w io.W ...@@ -57,7 +57,7 @@ func WriteCar(ctx context.Context, ds format.NodeGetter, roots []cid.Cid, w io.W
} }
func ReadHeader(r io.Reader) (*CarHeader, error) { func ReadHeader(r io.Reader) (*CarHeader, error) {
hb, err := util.LdRead(r) hb, err := util.LdRead(r, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -106,11 +106,20 @@ func (cw *carWriter) writeNode(ctx context.Context, nd format.Node) error { ...@@ -106,11 +106,20 @@ func (cw *carWriter) writeNode(ctx context.Context, nd format.Node) error {
} }
type CarReader struct { type CarReader struct {
r io.Reader r io.Reader
Header *CarHeader Header *CarHeader
zeroLenAsEOF bool
}
func NewCarReaderWithZeroLengthSectionAsEOF(r io.Reader) (*CarReader, error) {
return newCarReader(r, true)
} }
func NewCarReader(r io.Reader) (*CarReader, error) { func NewCarReader(r io.Reader) (*CarReader, error) {
return newCarReader(r, false)
}
func newCarReader(r io.Reader, zeroLenAsEOF bool) (*CarReader, error) {
ch, err := ReadHeader(r) ch, err := ReadHeader(r)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -125,13 +134,14 @@ func NewCarReader(r io.Reader) (*CarReader, error) { ...@@ -125,13 +134,14 @@ func NewCarReader(r io.Reader) (*CarReader, error) {
} }
return &CarReader{ return &CarReader{
r: r, r: r,
Header: ch, Header: ch,
zeroLenAsEOF: zeroLenAsEOF,
}, nil }, nil
} }
func (cr *CarReader) Next() (blocks.Block, error) { func (cr *CarReader) Next() (blocks.Block, error) {
c, data, err := util.ReadNode(cr.r) c, data, err := util.ReadNode(cr.r, cr.zeroLenAsEOF)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"io" "io"
"os"
"strings" "strings"
"testing" "testing"
...@@ -297,3 +298,36 @@ func TestCarHeaderMatchess(t *testing.T) { ...@@ -297,3 +298,36 @@ func TestCarHeaderMatchess(t *testing.T) {
}) })
} }
} }
func TestReadingZeroLengthSectionWithoutOptionSetIsError(t *testing.T) {
f, err := os.Open("../../testdata/sample-v1-with-zero-len-section.car")
require.NoError(t, err)
subject, err := NewCarReader(f)
require.NoError(t, err)
for {
_, err := subject.Next()
if err == io.EOF {
break
} else if err != nil {
require.EqualError(t, err, "varints malformed, could not reach the end")
return
}
}
require.Fail(t, "expected error when reading file with zero section without option set")
}
func TestReadingZeroLengthSectionWithOptionSetIsSuccess(t *testing.T) {
f, err := os.Open("../../testdata/sample-v1-with-zero-len-section.car")
require.NoError(t, err)
subject, err := NewCarReaderWithZeroLengthSectionAsEOF(f)
require.NoError(t, err)
for {
_, err := subject.Next()
if err == io.EOF {
break
}
require.NoError(t, err)
}
}
...@@ -15,8 +15,8 @@ type BytesReader interface { ...@@ -15,8 +15,8 @@ type BytesReader interface {
io.ByteReader io.ByteReader
} }
func ReadNode(r io.Reader) (cid.Cid, []byte, error) { func ReadNode(r io.Reader, zeroLenAsEOF bool) (cid.Cid, []byte, error) {
data, err := LdRead(r) data, err := LdRead(r, zeroLenAsEOF)
if err != nil { if err != nil {
return cid.Cid{}, nil, err return cid.Cid{}, nil, err
} }
...@@ -61,7 +61,7 @@ func LdSize(d ...[]byte) uint64 { ...@@ -61,7 +61,7 @@ func LdSize(d ...[]byte) uint64 {
return sum + uint64(s) return sum + uint64(s)
} }
func LdRead(r io.Reader) ([]byte, error) { func LdRead(r io.Reader, zeroLenAsEOF bool) ([]byte, error) {
l, err := varint.ReadUvarint(internalio.ToByteReader(r)) l, err := varint.ReadUvarint(internalio.ToByteReader(r))
if err != nil { if err != nil {
// If the length of bytes read is non-zero when the error is EOF then signal an unclean EOF. // If the length of bytes read is non-zero when the error is EOF then signal an unclean EOF.
...@@ -69,6 +69,8 @@ func LdRead(r io.Reader) ([]byte, error) { ...@@ -69,6 +69,8 @@ func LdRead(r io.Reader) ([]byte, error) {
return nil, io.ErrUnexpectedEOF return nil, io.ErrUnexpectedEOF
} }
return nil, err return nil, err
} else if l == 0 && zeroLenAsEOF {
return nil, io.EOF
} }
buf := make([]byte, l) buf := make([]byte, l)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment