Commit c2e497e2 authored by Daniel Martí's avatar Daniel Martí

blockstore: implement UseWholeCIDs

Update the tests too, as a few expected whole CIDs.
For now, just make them use the option.

This required index.Index to gain a new method, GetAll, so that when
using whole CIDs, methods like Get can return true if any of the
matching indexed CIDs are an exact whole-CID match, and not just looking
at the first matching indexed CID.

GetAll is akin to an iteration over all matching indexed CIDs, and its
callback returns a boolean to say if the iteration should continue.
This allows stopping as soon as we're done.

We also remove Index.Get, instead replacing it with a helper called
GetFirst, which simply makes simple uses of GetAll a single line.

We remove the non-specced and unused index implementations, too.
They were left in place in case they were useful again,
but they haven't been so far, and their code is still in git.
Keeping them around just means updating more code when refactoring.

While at it, make ZeroLengthSectionAsEOF take a boolean and return an
option, just like the other boolean options, for consistency.

Fixes #130.
parent 0ae4f9c0
...@@ -20,7 +20,10 @@ func ExampleOpenReadOnly() { ...@@ -20,7 +20,10 @@ func ExampleOpenReadOnly() {
// Note, `OpenReadOnly` accepts bot CARv1 and CARv2 formats and transparently generate index // Note, `OpenReadOnly` accepts bot CARv1 and CARv2 formats and transparently generate index
// in the background if necessary. // in the background if necessary.
// This instance sets ZeroLengthSectionAsEOF option to treat zero sized sections in file as EOF. // This instance sets ZeroLengthSectionAsEOF option to treat zero sized sections in file as EOF.
robs, err := blockstore.OpenReadOnly("../testdata/sample-v1.car", carv2.ZeroLengthSectionAsEOF) robs, err := blockstore.OpenReadOnly("../testdata/sample-v1.car",
blockstore.UseWholeCIDs(true),
carv2.ZeroLengthSectionAsEOF(true),
)
if err != nil { if err != nil {
panic(err) panic(err)
} }
......
...@@ -55,7 +55,7 @@ func newRecordFromCid(c cid.Cid, at uint64) recordDigest { ...@@ -55,7 +55,7 @@ func newRecordFromCid(c cid.Cid, at uint64) recordDigest {
panic(err) panic(err)
} }
return recordDigest{d.Digest, index.Record{Cid: c, Idx: at}} return recordDigest{d.Digest, index.Record{Cid: c, Offset: at}}
} }
func (ii *insertionIndex) insertNoReplace(key cid.Cid, n uint64) { func (ii *insertionIndex) insertNoReplace(key cid.Cid, n uint64) {
...@@ -77,7 +77,31 @@ func (ii *insertionIndex) Get(c cid.Cid) (uint64, error) { ...@@ -77,7 +77,31 @@ func (ii *insertionIndex) Get(c cid.Cid) (uint64, error) {
return 0, errUnsupported return 0, errUnsupported
} }
return r.Record.Idx, nil return r.Record.Offset, nil
}
func (ii *insertionIndex) GetAll(c cid.Cid, fn func(uint64) bool) error {
d, err := multihash.Decode(c.Hash())
if err != nil {
return err
}
entry := recordDigest{digest: d.Digest}
any := false
iter := func(i llrb.Item) bool {
existing := i.(recordDigest)
if !bytes.Equal(existing.digest, entry.digest) {
// We've already looked at all entries with matching digests.
return false
}
any = true
return fn(existing.Record.Offset)
}
ii.items.AscendGreaterOrEqual(entry, iter)
if !any {
return index.ErrNotFound
}
return nil
} }
func (ii *insertionIndex) Marshal(w io.Writer) error { func (ii *insertionIndex) Marshal(w io.Writer) error {
...@@ -152,6 +176,10 @@ func (ii *insertionIndex) flatten() (index.Index, error) { ...@@ -152,6 +176,10 @@ func (ii *insertionIndex) flatten() (index.Index, error) {
return si, nil return si, nil
} }
// note that hasExactCID is very similar to GetAll,
// but it's separate as it allows us to compare Record.Cid directly,
// whereas GetAll just provides Record.Offset.
func (ii *insertionIndex) hasExactCID(c cid.Cid) bool { func (ii *insertionIndex) hasExactCID(c cid.Cid) bool {
d, err := multihash.Decode(c.Hash()) d, err := multihash.Decode(c.Hash())
if err != nil { if err != nil {
......
...@@ -65,7 +65,6 @@ type ReadOnly struct { ...@@ -65,7 +65,6 @@ type ReadOnly struct {
// go-car/v2 package. // go-car/v2 package.
func UseWholeCIDs(enable bool) carv2.ReadOption { func UseWholeCIDs(enable bool) carv2.ReadOption {
return func(o *carv2.ReadOptions) { return func(o *carv2.ReadOptions) {
// TODO: update methods like Get, Has, and AllKeysChan to obey this.
o.BlockstoreUseWholeCIDs = enable o.BlockstoreUseWholeCIDs = enable
} }
} }
...@@ -177,22 +176,35 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) { ...@@ -177,22 +176,35 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) {
b.mu.RLock() b.mu.RLock()
defer b.mu.RUnlock() defer b.mu.RUnlock()
offset, err := b.idx.Get(key) var fnFound bool
var fnErr error
err := b.idx.GetAll(key, func(offset uint64) bool {
uar := internalio.NewOffsetReadSeeker(b.backing, int64(offset))
var err error
_, err = varint.ReadUvarint(uar)
if err != nil {
fnErr = err
return false
}
_, readCid, err := cid.CidFromReader(uar)
if err != nil {
fnErr = err
return false
}
if b.ropts.BlockstoreUseWholeCIDs {
fnFound = readCid.Equals(key)
return !fnFound // continue looking if we haven't found it
} else {
fnFound = bytes.Equal(readCid.Hash(), key.Hash())
return false
}
})
if errors.Is(err, index.ErrNotFound) { if errors.Is(err, index.ErrNotFound) {
return false, nil return false, nil
} else if err != nil { } else if err != nil {
return false, err return false, err
} }
uar := internalio.NewOffsetReadSeeker(b.backing, int64(offset)) return fnFound, fnErr
_, err = varint.ReadUvarint(uar)
if err != nil {
return false, err
}
_, c, err := cid.CidFromReader(uar)
if err != nil {
return false, err
}
return bytes.Equal(key.Hash(), c.Hash()), nil
} }
// Get gets a block corresponding to the given key. // Get gets a block corresponding to the given key.
...@@ -200,21 +212,39 @@ func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) { ...@@ -200,21 +212,39 @@ func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) {
b.mu.RLock() b.mu.RLock()
defer b.mu.RUnlock() defer b.mu.RUnlock()
offset, err := b.idx.Get(key) var fnData []byte
if err != nil { var fnErr error
if err == index.ErrNotFound { err := b.idx.GetAll(key, func(offset uint64) bool {
err = blockstore.ErrNotFound readCid, data, err := b.readBlock(int64(offset))
if err != nil {
fnErr = err
return false
} }
if b.ropts.BlockstoreUseWholeCIDs {
if readCid.Equals(key) {
fnData = data
return false
} else {
return true // continue looking
}
} else {
if bytes.Equal(readCid.Hash(), key.Hash()) {
fnData = data
}
return false
}
})
if errors.Is(err, index.ErrNotFound) {
return nil, blockstore.ErrNotFound
} else if err != nil {
return nil, err return nil, err
} else if fnErr != nil {
return nil, fnErr
} }
entry, data, err := b.readBlock(int64(offset)) if fnData == nil {
if err != nil {
return nil, err
}
if !bytes.Equal(key.Hash(), entry.Hash()) {
return nil, blockstore.ErrNotFound return nil, blockstore.ErrNotFound
} }
return blocks.NewBlockWithCid(data, key) return blocks.NewBlockWithCid(fnData, key)
} }
// GetSize gets the size of an item corresponding to the given key. // GetSize gets the size of an item corresponding to the given key.
...@@ -222,23 +252,45 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) { ...@@ -222,23 +252,45 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) {
b.mu.RLock() b.mu.RLock()
defer b.mu.RUnlock() defer b.mu.RUnlock()
idx, err := b.idx.Get(key) var fnSize int = -1
if err != nil { var fnErr error
return -1, err err := b.idx.GetAll(key, func(offset uint64) bool {
} rdr := internalio.NewOffsetReadSeeker(b.backing, int64(offset))
rdr := internalio.NewOffsetReadSeeker(b.backing, int64(idx)) sectionLen, err := varint.ReadUvarint(rdr)
sectionLen, err := varint.ReadUvarint(rdr) if err != nil {
if err != nil { fnErr = err
return false
}
cidLen, readCid, err := cid.CidFromReader(rdr)
if err != nil {
fnErr = err
return false
}
if b.ropts.BlockstoreUseWholeCIDs {
if readCid.Equals(key) {
fnSize = int(sectionLen) - cidLen
return false
} else {
return true // continue looking
}
} else {
if bytes.Equal(readCid.Hash(), key.Hash()) {
fnSize = int(sectionLen) - cidLen
}
return false
}
})
if errors.Is(err, index.ErrNotFound) {
return -1, blockstore.ErrNotFound return -1, blockstore.ErrNotFound
} else if err != nil {
return -1, err
} else if fnErr != nil {
return -1, fnErr
} }
cidLen, readCid, err := cid.CidFromReader(rdr) if fnSize == -1 {
if err != nil {
return 0, err
}
if !readCid.Equals(key) {
return -1, blockstore.ErrNotFound return -1, blockstore.ErrNotFound
} }
return int(sectionLen) - cidLen, err return fnSize, nil
} }
// Put is not supported and always returns an error. // Put is not supported and always returns an error.
...@@ -304,6 +356,11 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { ...@@ -304,6 +356,11 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return // TODO: log this error return // TODO: log this error
} }
// If we're just using multihashes, flatten to the "raw" codec.
if !b.ropts.BlockstoreUseWholeCIDs {
c = cid.NewCidV1(cid.Raw, c.Hash())
}
select { select {
case ch <- c: case ch <- c:
case <-ctx.Done(): case <-ctx.Done():
......
...@@ -49,7 +49,9 @@ func TestReadOnly(t *testing.T) { ...@@ -49,7 +49,9 @@ func TestReadOnly(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
subject, err := OpenReadOnly(tt.v1OrV2path) subject, err := OpenReadOnly(tt.v1OrV2path,
UseWholeCIDs(true),
)
t.Cleanup(func() { subject.Close() }) t.Cleanup(func() { subject.Close() })
require.NoError(t, err) require.NoError(t, err)
......
...@@ -89,7 +89,6 @@ func AllowDuplicatePuts(allow bool) carv2.WriteOption { ...@@ -89,7 +89,6 @@ func AllowDuplicatePuts(allow bool) carv2.WriteOption {
// Resuming from finalized files is allowed. However, resumption will regenerate the index // Resuming from finalized files is allowed. However, resumption will regenerate the index
// regardless by scanning every existing block in file. // regardless by scanning every existing block in file.
func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption) (*ReadWrite, error) { func OpenReadWrite(path string, roots []cid.Cid, opts ...carv2.ReadWriteOption) (*ReadWrite, error) {
// TODO: enable deduplication by default now that resumption is automatically attempted.
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o666) // TODO: Should the user be able to configure FileMode permissions? f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o666) // TODO: Should the user be able to configure FileMode permissions?
if err != nil { if err != nil {
return nil, fmt.Errorf("could not open read/write file: %w", err) return nil, fmt.Errorf("could not open read/write file: %w", err)
......
...@@ -123,21 +123,29 @@ func TestBlockstore(t *testing.T) { ...@@ -123,21 +123,29 @@ func TestBlockstore(t *testing.T) {
func TestBlockstorePutSameHashes(t *testing.T) { func TestBlockstorePutSameHashes(t *testing.T) {
tdir := t.TempDir() tdir := t.TempDir()
// wbs allows duplicate puts. // This blockstore allows duplicate puts,
wbs, err := blockstore.OpenReadWrite( // and identifies by multihash as per the default.
filepath.Join(tdir, "readwrite.car"), nil, wbsAllowDups, err := blockstore.OpenReadWrite(
filepath.Join(tdir, "readwrite-allowdup.car"), nil,
blockstore.AllowDuplicatePuts(true), blockstore.AllowDuplicatePuts(true),
) )
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { wbs.Finalize() }) t.Cleanup(func() { wbsAllowDups.Finalize() })
// wbs deduplicates puts by CID. // This blockstore deduplicates puts by CID.
wbsd, err := blockstore.OpenReadWrite( wbsByCID, err := blockstore.OpenReadWrite(
filepath.Join(tdir, "readwrite-dedup.car"), nil, filepath.Join(tdir, "readwrite-dedup-wholecid.car"), nil,
blockstore.UseWholeCIDs(true), blockstore.UseWholeCIDs(true),
) )
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { wbsd.Finalize() }) t.Cleanup(func() { wbsByCID.Finalize() })
// This blockstore deduplicates puts by multihash.
wbsByHash, err := blockstore.OpenReadWrite(
filepath.Join(tdir, "readwrite-dedup-hash.car"), nil,
)
require.NoError(t, err)
t.Cleanup(func() { wbsByHash.Finalize() })
var blockList []blocks.Block var blockList []blocks.Block
...@@ -160,15 +168,15 @@ func TestBlockstorePutSameHashes(t *testing.T) { ...@@ -160,15 +168,15 @@ func TestBlockstorePutSameHashes(t *testing.T) {
// However, we have multiple CIDs for each multihash. // However, we have multiple CIDs for each multihash.
// We also have two duplicate CIDs. // We also have two duplicate CIDs.
data1 := []byte("foo bar") data1 := []byte("foo bar")
appendBlock(data1, 0, cid.Raw) appendBlock(data1, 0, cid.DagProtobuf)
appendBlock(data1, 1, cid.Raw) appendBlock(data1, 1, cid.DagProtobuf)
appendBlock(data1, 1, cid.DagCBOR) appendBlock(data1, 1, cid.DagCBOR)
appendBlock(data1, 1, cid.DagCBOR) // duplicate CID appendBlock(data1, 1, cid.DagCBOR) // duplicate CID
data2 := []byte("foo bar baz") data2 := []byte("foo bar baz")
appendBlock(data2, 0, cid.Raw) appendBlock(data2, 0, cid.DagProtobuf)
appendBlock(data2, 1, cid.Raw) appendBlock(data2, 1, cid.DagProtobuf)
appendBlock(data2, 1, cid.Raw) // duplicate CID appendBlock(data2, 1, cid.DagProtobuf) // duplicate CID
appendBlock(data2, 1, cid.DagCBOR) appendBlock(data2, 1, cid.DagCBOR)
countBlocks := func(bs *blockstore.ReadWrite) int { countBlocks := func(bs *blockstore.ReadWrite) int {
...@@ -176,52 +184,75 @@ func TestBlockstorePutSameHashes(t *testing.T) { ...@@ -176,52 +184,75 @@ func TestBlockstorePutSameHashes(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
n := 0 n := 0
for range ch { for c := range ch {
if c.Prefix().Codec == cid.Raw {
if bs == wbsByCID {
t.Error("expected blockstore with UseWholeCIDs to not flatten on AllKeysChan")
}
} else {
if bs != wbsByCID {
t.Error("expected blockstore without UseWholeCIDs to flatten on AllKeysChan")
}
}
n++ n++
} }
return n return n
} }
for i, block := range blockList { putBlockList := func(bs *blockstore.ReadWrite) {
// Has should never error here. for i, block := range blockList {
// The first block should be missing. // Has should never error here.
// Others might not, given the duplicate hashes. // The first block should be missing.
has, err := wbs.Has(block.Cid()) // Others might not, given the duplicate hashes.
require.NoError(t, err) has, err := bs.Has(block.Cid())
if i == 0 { require.NoError(t, err)
require.False(t, has) if i == 0 {
} require.False(t, has)
}
err = wbs.Put(block) err = bs.Put(block)
require.NoError(t, err) require.NoError(t, err)
}
for _, block := range blockList { // Has, Get, and GetSize need to work right after a Put.
has, err := wbs.Has(block.Cid()) has, err = bs.Has(block.Cid())
require.NoError(t, err) require.NoError(t, err)
require.True(t, has) require.True(t, has)
got, err := wbs.Get(block.Cid()) got, err := bs.Get(block.Cid())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, block.Cid(), got.Cid()) require.Equal(t, block.Cid(), got.Cid())
require.Equal(t, block.RawData(), got.RawData()) require.Equal(t, block.RawData(), got.RawData())
size, err := bs.GetSize(block.Cid())
require.NoError(t, err)
require.Equal(t, len(block.RawData()), size)
}
} }
require.Equal(t, len(blockList), countBlocks(wbs)) putBlockList(wbsAllowDups)
require.Equal(t, len(blockList), countBlocks(wbsAllowDups))
err = wbs.Finalize() err = wbsAllowDups.Finalize()
require.NoError(t, err) require.NoError(t, err)
// Put the same list of blocks to the blockstore that // Put the same list of blocks to the blockstore that
// deduplicates by CID. // deduplicates by CID.
// We should end up with two fewer blocks. // We should end up with two fewer blocks,
for _, block := range blockList { // as two are entire CID duplicates.
err = wbsd.Put(block) putBlockList(wbsByCID)
require.NoError(t, err) require.Equal(t, len(blockList)-2, countBlocks(wbsByCID))
}
require.Equal(t, len(blockList)-2, countBlocks(wbsd)) err = wbsByCID.Finalize()
require.NoError(t, err)
// Put the same list of blocks to the blockstore that
// deduplicates by CID.
// We should end up with just two blocks,
// as the original set of blocks only has two distinct multihashes.
putBlockList(wbsByHash)
require.Equal(t, 2, countBlocks(wbsByHash))
err = wbsd.Finalize() err = wbsByHash.Finalize()
require.NoError(t, err) require.NoError(t, err)
} }
...@@ -280,7 +311,8 @@ func TestBlockstoreNullPadding(t *testing.T) { ...@@ -280,7 +311,8 @@ func TestBlockstoreNullPadding(t *testing.T) {
// A sample null-padded CARv1 file. // A sample null-padded CARv1 file.
paddedV1 = append(paddedV1, make([]byte, 2048)...) paddedV1 = append(paddedV1, make([]byte, 2048)...)
rbs, err := blockstore.NewReadOnly(bufferReaderAt(paddedV1), nil, carv2.ZeroLengthSectionAsEOF) rbs, err := blockstore.NewReadOnly(bufferReaderAt(paddedV1), nil,
carv2.ZeroLengthSectionAsEOF(true))
require.NoError(t, err) require.NoError(t, err)
roots, err := rbs.Roots() roots, err := rbs.Roots()
...@@ -313,7 +345,8 @@ func TestBlockstoreResumption(t *testing.T) { ...@@ -313,7 +345,8 @@ func TestBlockstoreResumption(t *testing.T) {
path := filepath.Join(t.TempDir(), "readwrite-resume.car") path := filepath.Join(t.TempDir(), "readwrite-resume.car")
// Create an incomplete CARv2 file with no blocks put. // Create an incomplete CARv2 file with no blocks put.
subject, err := blockstore.OpenReadWrite(path, r.Header.Roots) subject, err := blockstore.OpenReadWrite(path, r.Header.Roots,
blockstore.UseWholeCIDs(true))
require.NoError(t, err) require.NoError(t, err)
// For each block resume on the same file, putting blocks one at a time. // For each block resume on the same file, putting blocks one at a time.
...@@ -345,7 +378,8 @@ func TestBlockstoreResumption(t *testing.T) { ...@@ -345,7 +378,8 @@ func TestBlockstoreResumption(t *testing.T) {
// We do this to avoid resource leak during testing. // We do this to avoid resource leak during testing.
require.NoError(t, subject.Close()) require.NoError(t, subject.Close())
} }
subject, err = blockstore.OpenReadWrite(path, r.Header.Roots) subject, err = blockstore.OpenReadWrite(path, r.Header.Roots,
blockstore.UseWholeCIDs(true))
require.NoError(t, err) require.NoError(t, err)
} }
require.NoError(t, subject.Put(b)) require.NoError(t, subject.Put(b))
...@@ -377,7 +411,8 @@ func TestBlockstoreResumption(t *testing.T) { ...@@ -377,7 +411,8 @@ func TestBlockstoreResumption(t *testing.T) {
require.NoError(t, subject.Close()) require.NoError(t, subject.Close())
// Finalize the blockstore to complete partially written CARv2 file. // Finalize the blockstore to complete partially written CARv2 file.
subject, err = blockstore.OpenReadWrite(path, r.Header.Roots) subject, err = blockstore.OpenReadWrite(path, r.Header.Roots,
blockstore.UseWholeCIDs(true))
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, subject.Finalize()) require.NoError(t, subject.Finalize())
...@@ -528,9 +563,9 @@ func TestReadWriteWithPaddingWorksAsExpected(t *testing.T) { ...@@ -528,9 +563,9 @@ func TestReadWriteWithPaddingWorksAsExpected(t *testing.T) {
indexSize := stat.Size() - int64(wantIndexOffset) indexSize := stat.Size() - int64(wantIndexOffset)
gotIdx, err := index.ReadFrom(io.NewSectionReader(f, int64(wantIndexOffset), indexSize)) gotIdx, err := index.ReadFrom(io.NewSectionReader(f, int64(wantIndexOffset), indexSize))
require.NoError(t, err) require.NoError(t, err)
_, err = gotIdx.Get(oneTestBlockCid) _, err = index.GetFirst(gotIdx, oneTestBlockCid)
require.NoError(t, err) require.NoError(t, err)
_, err = gotIdx.Get(anotherTestBlockCid) _, err = index.GetFirst(gotIdx, anotherTestBlockCid)
require.NoError(t, err) require.NoError(t, err)
} }
......
...@@ -34,7 +34,7 @@ func ExampleReadFrom() { ...@@ -34,7 +34,7 @@ func ExampleReadFrom() {
// For each root CID print the offset relative to CARv1 data payload. // For each root CID print the offset relative to CARv1 data payload.
for _, r := range roots { for _, r := range roots {
offset, err := idx.Get(r) offset, err := index.GetFirst(idx, r)
if err != nil { if err != nil {
panic(err) panic(err)
} }
......
...@@ -14,33 +14,59 @@ import ( ...@@ -14,33 +14,59 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
) )
// Codec table is a first var-int in CAR indexes
const (
indexHashed codec = 0x300000 + iota
indexSingleSorted
indexGobHashed
)
type ( type (
// codec is used as a multicodec identifier for CAR index files
codec int
// Record is a pre-processed record of a car item and location. // Record is a pre-processed record of a car item and location.
Record struct { Record struct {
cid.Cid cid.Cid
Idx uint64 Offset uint64
} }
// Index provides an interface for looking up byte offset of a given CID. // Index provides an interface for looking up byte offset of a given CID.
//
// Note that each indexing mechanism is free to match CIDs however it
// sees fit. For example, multicodec.CarIndexSorted only indexes
// multihash digests, meaning that Get and GetAll will find matching
// blocks even if the CID's encoding multicodec differs. Other index
// implementations might index the entire CID, the entire multihash, or
// just part of a multihash's digest.
Index interface { Index interface {
// Codec provides the multicodec code that the index implements.
//
// Note that this may return a reserved code if the index
// implementation is not defined in a spec.
Codec() multicodec.Code Codec() multicodec.Code
// Marshal encodes the index in serial form.
Marshal(w io.Writer) error Marshal(w io.Writer) error
// Unmarshal decodes the index from its serial form.
Unmarshal(r io.Reader) error Unmarshal(r io.Reader) error
Get(cid.Cid) (uint64, error)
// Load inserts a number of records into the index.
Load([]Record) error Load([]Record) error
// Get looks up all blocks matching a given CID,
// calling a function for each one of their offsets.
//
// If the function returns false, GetAll stops.
//
// If no error occurred and the CID isn't indexed,
// meaning that no callbacks happen,
// ErrNotFound is returned.
GetAll(cid.Cid, func(uint64) bool) error
} }
) )
// GetFirst is a wrapper over Index.GetAll, returning the offset for the first
// matching indexed CID.
func GetFirst(idx Index, key cid.Cid) (uint64, error) {
var firstOffset uint64
err := idx.GetAll(key, func(offset uint64) bool {
firstOffset = offset
return false
})
return firstOffset, err
}
// New constructs a new index corresponding to the given CAR index codec. // New constructs a new index corresponding to the given CAR index codec.
func New(codec multicodec.Code) (Index, error) { func New(codec multicodec.Code) (Index, error) {
switch codec { switch codec {
......
...@@ -32,21 +32,6 @@ func TestNew(t *testing.T) { ...@@ -32,21 +32,6 @@ func TestNew(t *testing.T) {
codec: multicodec.Cidv1, codec: multicodec.Cidv1,
wantErr: true, wantErr: true,
}, },
{
name: "IndexSingleSortedMultiCodecIsError",
codec: multicodec.Code(indexSingleSorted),
wantErr: true,
},
{
name: "IndexHashedMultiCodecIsError",
codec: multicodec.Code(indexHashed),
wantErr: true,
},
{
name: "IndexGobHashedMultiCodecIsError",
codec: multicodec.Code(indexGobHashed),
wantErr: true,
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
...@@ -82,7 +67,7 @@ func TestReadFrom(t *testing.T) { ...@@ -82,7 +67,7 @@ func TestReadFrom(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Get offset from the index for a CID and assert it exists // Get offset from the index for a CID and assert it exists
gotOffset, err := subject.Get(wantBlock.Cid()) gotOffset, err := GetFirst(subject, wantBlock.Cid())
require.NoError(t, err) require.NoError(t, err)
require.NotZero(t, gotOffset) require.NotZero(t, gotOffset)
......
package index
import (
"encoding/gob"
"io"
"github.com/multiformats/go-multicodec"
"github.com/ipfs/go-cid"
)
//lint:ignore U1000 kept for potential future use.
type mapGobIndex map[cid.Cid]uint64
func (m *mapGobIndex) Get(c cid.Cid) (uint64, error) {
el, ok := (*m)[c]
if !ok {
return 0, ErrNotFound
}
return el, nil
}
func (m *mapGobIndex) Marshal(w io.Writer) error {
e := gob.NewEncoder(w)
return e.Encode(m)
}
func (m *mapGobIndex) Unmarshal(r io.Reader) error {
d := gob.NewDecoder(r)
return d.Decode(m)
}
func (m *mapGobIndex) Codec() multicodec.Code {
return multicodec.Code(indexHashed)
}
func (m *mapGobIndex) Load(rs []Record) error {
for _, r := range rs {
(*m)[r.Cid] = r.Idx
}
return nil
}
//lint:ignore U1000 kept for potential future use.
func newGobHashed() Index {
mi := make(mapGobIndex)
return &mi
}
package index
import (
"io"
"github.com/multiformats/go-multicodec"
"github.com/ipfs/go-cid"
cbor "github.com/whyrusleeping/cbor/go"
)
//lint:ignore U1000 kept for potential future use.
type mapIndex map[cid.Cid]uint64
func (m *mapIndex) Get(c cid.Cid) (uint64, error) {
el, ok := (*m)[c]
if !ok {
return 0, ErrNotFound
}
return el, nil
}
func (m *mapIndex) Marshal(w io.Writer) error {
return cbor.Encode(w, m)
}
func (m *mapIndex) Unmarshal(r io.Reader) error {
d := cbor.NewDecoder(r)
return d.Decode(m)
}
func (m *mapIndex) Codec() multicodec.Code {
return multicodec.Code(indexHashed)
}
func (m *mapIndex) Load(rs []Record) error {
for _, r := range rs {
(*m)[r.Cid] = r.Idx
}
return nil
}
//lint:ignore U1000 kept for potential future use.
func newHashed() Index {
mi := make(mapIndex)
return &mi
}
...@@ -44,10 +44,6 @@ func (r recordSet) Swap(i, j int) { ...@@ -44,10 +44,6 @@ func (r recordSet) Swap(i, j int) {
r[i], r[j] = r[j], r[i] r[i], r[j] = r[j], r[i]
} }
func (s *singleWidthIndex) Codec() multicodec.Code {
return multicodec.Code(indexSingleSorted)
}
func (s *singleWidthIndex) Marshal(w io.Writer) error { func (s *singleWidthIndex) Marshal(w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, s.width); err != nil { if err := binary.Write(w, binary.LittleEndian, s.width); err != nil {
return err return err
...@@ -77,25 +73,34 @@ func (s *singleWidthIndex) Less(i int, digest []byte) bool { ...@@ -77,25 +73,34 @@ func (s *singleWidthIndex) Less(i int, digest []byte) bool {
return bytes.Compare(digest[:], s.index[i*int(s.width):((i+1)*int(s.width)-8)]) <= 0 return bytes.Compare(digest[:], s.index[i*int(s.width):((i+1)*int(s.width)-8)]) <= 0
} }
func (s *singleWidthIndex) Get(c cid.Cid) (uint64, error) { func (s *singleWidthIndex) GetAll(c cid.Cid, fn func(uint64) bool) error {
d, err := multihash.Decode(c.Hash()) d, err := multihash.Decode(c.Hash())
if err != nil { if err != nil {
return 0, err return err
} }
return s.get(d.Digest) return s.getAll(d.Digest, fn)
} }
func (s *singleWidthIndex) get(d []byte) (uint64, error) { func (s *singleWidthIndex) getAll(d []byte, fn func(uint64) bool) error {
idx := sort.Search(int(s.len), func(i int) bool { idx := sort.Search(int(s.len), func(i int) bool {
return s.Less(i, d) return s.Less(i, d)
}) })
if uint64(idx) == s.len { if uint64(idx) == s.len {
return 0, ErrNotFound return ErrNotFound
}
any := false
for bytes.Equal(d[:], s.index[idx*int(s.width):(idx+1)*int(s.width)-8]) {
any = true
offset := binary.LittleEndian.Uint64(s.index[(idx+1)*int(s.width)-8 : (idx+1)*int(s.width)])
if !fn(offset) {
break
}
} }
if !bytes.Equal(d[:], s.index[idx*int(s.width):(idx+1)*int(s.width)-8]) { if !any {
return 0, ErrNotFound return ErrNotFound
} }
return binary.LittleEndian.Uint64(s.index[(idx+1)*int(s.width)-8 : (idx+1)*int(s.width)]), nil return nil
} }
func (s *singleWidthIndex) Load(items []Record) error { func (s *singleWidthIndex) Load(items []Record) error {
...@@ -115,15 +120,15 @@ func (s *singleWidthIndex) Load(items []Record) error { ...@@ -115,15 +120,15 @@ func (s *singleWidthIndex) Load(items []Record) error {
return nil return nil
} }
func (m *multiWidthIndex) Get(c cid.Cid) (uint64, error) { func (m *multiWidthIndex) GetAll(c cid.Cid, fn func(uint64) bool) error {
d, err := multihash.Decode(c.Hash()) d, err := multihash.Decode(c.Hash())
if err != nil { if err != nil {
return 0, err return err
} }
if s, ok := (*m)[uint32(len(d.Digest)+8)]; ok { if s, ok := (*m)[uint32(len(d.Digest)+8)]; ok {
return s.get(d.Digest) return s.getAll(d.Digest, fn)
} }
return 0, ErrNotFound return ErrNotFound
} }
func (m *multiWidthIndex) Codec() multicodec.Code { func (m *multiWidthIndex) Codec() multicodec.Code {
...@@ -184,7 +189,7 @@ func (m *multiWidthIndex) Load(items []Record) error { ...@@ -184,7 +189,7 @@ func (m *multiWidthIndex) Load(items []Record) error {
idxs[len(digest)] = make([]digestRecord, 0) idxs[len(digest)] = make([]digestRecord, 0)
idx = idxs[len(digest)] idx = idxs[len(digest)]
} }
idxs[len(digest)] = append(idx, digestRecord{digest, item.Idx}) idxs[len(digest)] = append(idx, digestRecord{digest, item.Offset})
} }
// Sort each list. then write to compact form. // Sort each list. then write to compact form.
...@@ -209,9 +214,3 @@ func newSorted() Index { ...@@ -209,9 +214,3 @@ func newSorted() Index {
m := make(multiWidthIndex) m := make(multiWidthIndex)
return &m return &m
} }
//lint:ignore U1000 kept for potential future use.
func newSingleSorted() Index {
s := singleWidthIndex{}
return &s
}
...@@ -18,10 +18,6 @@ func TestSortedIndex_GetReturnsNotFoundWhenCidDoesNotExist(t *testing.T) { ...@@ -18,10 +18,6 @@ func TestSortedIndex_GetReturnsNotFoundWhenCidDoesNotExist(t *testing.T) {
name string name string
subject Index subject Index
}{ }{
{
"SingleSorted",
newSingleSorted(),
},
{ {
"Sorted", "Sorted",
newSorted(), newSorted(),
...@@ -29,7 +25,7 @@ func TestSortedIndex_GetReturnsNotFoundWhenCidDoesNotExist(t *testing.T) { ...@@ -29,7 +25,7 @@ func TestSortedIndex_GetReturnsNotFoundWhenCidDoesNotExist(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
gotOffset, err := tt.subject.Get(nonExistingKey) gotOffset, err := GetFirst(tt.subject, nonExistingKey)
require.Equal(t, ErrNotFound, err) require.Equal(t, ErrNotFound, err)
require.Equal(t, uint64(0), gotOffset) require.Equal(t, uint64(0), gotOffset)
}) })
......
...@@ -75,7 +75,7 @@ func GenerateIndex(v1r io.Reader, opts ...ReadOption) (index.Index, error) { ...@@ -75,7 +75,7 @@ func GenerateIndex(v1r io.Reader, opts ...ReadOption) (index.Index, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
records = append(records, index.Record{Cid: c, Idx: uint64(sectionOffset)}) records = append(records, index.Record{Cid: c, Offset: uint64(sectionOffset)})
// Seek to the next section by skipping the block. // Seek to the next section by skipping the block.
// The section length includes the CID, so subtract it. // The section length includes the CID, so subtract it.
......
...@@ -46,8 +46,10 @@ type ReadWriteOption interface { ...@@ -46,8 +46,10 @@ type ReadWriteOption interface {
// a zero-length section as the end of the input CAR file. For example, this can // a zero-length section as the end of the input CAR file. For example, this can
// be useful to allow "null padding" after a CARv1 without knowing where the // be useful to allow "null padding" after a CARv1 without knowing where the
// padding begins. // padding begins.
func ZeroLengthSectionAsEOF(o *ReadOptions) { func ZeroLengthSectionAsEOF(enable bool) ReadOption {
o.ZeroLengthSectionAsEOF = true return func(o *ReadOptions) {
o.ZeroLengthSectionAsEOF = true
}
} }
// UseDataPadding is a write option which sets the padding to be added between // UseDataPadding is a write option which sets the padding to be added between
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment