Unverified Commit 3e2cf7af authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #18 from ipld/feat/selective-car

feat(car): add selector support
parents 28226cb4 6236fbaa
......@@ -23,6 +23,10 @@ type Store interface {
Put(blocks.Block) error
}
type ReadStore interface {
Get(cid.Cid) (blocks.Block, error)
}
type CarHeader struct {
Roots []cid.Cid
Version uint64
......@@ -41,17 +45,17 @@ func WriteCar(ctx context.Context, ds format.DAGService, roots []cid.Cid, w io.W
}
func WriteCarWithWalker(ctx context.Context, ds format.DAGService, roots []cid.Cid, w io.Writer, walk WalkFunc) error {
cw := &carWriter{ds: ds, w: w, walk: walk}
h := &CarHeader{
Roots: roots,
Version: 1,
}
if err := cw.WriteHeader(h); err != nil {
if err := WriteHeader(h, w); err != nil {
return fmt.Errorf("failed to write car header: %s", err)
}
cw := &carWriter{ds: ds, w: w, walk: walk}
seen := cid.NewSet()
for _, r := range roots {
if err := dag.Walk(ctx, cw.enumGetLinks, r, seen.Visit); err != nil {
......@@ -79,13 +83,22 @@ func ReadHeader(br *bufio.Reader) (*CarHeader, error) {
return &ch, nil
}
func (cw *carWriter) WriteHeader(h *CarHeader) error {
func WriteHeader(h *CarHeader, w io.Writer) error {
hb, err := cbor.DumpObject(h)
if err != nil {
return err
}
return util.LdWrite(cw.w, hb)
return util.LdWrite(w, hb)
}
func HeaderSize(h *CarHeader) (uint64, error) {
hb, err := cbor.DumpObject(h)
if err != nil {
return 0, err
}
return util.LdSize(hb), nil
}
func (cw *carWriter) enumGetLinks(ctx context.Context, c cid.Cid) ([]*format.Link, error) {
......@@ -98,7 +111,7 @@ func (cw *carWriter) enumGetLinks(ctx context.Context, c cid.Cid) ([]*format.Lin
return nil, err
}
return nd.Links(), nil
return cw.walk(nd)
}
func (cw *carWriter) writeNode(ctx context.Context, nd format.Node) error {
......
......@@ -9,6 +9,10 @@ import (
format "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
dstest "github.com/ipfs/go-merkledag/test"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"
)
func assertAddNodes(t *testing.T, ds format.DAGService, nds ...format.Node) {
......@@ -69,3 +73,90 @@ func TestRoundtrip(t *testing.T) {
}
}
}
func TestRoundtripSelective(t *testing.T) {
sourceBserv := dstest.Bserv()
sourceBs := sourceBserv.Blockstore()
dserv := dag.NewDAGService(sourceBserv)
a := dag.NewRawNode([]byte("aaaa"))
b := dag.NewRawNode([]byte("bbbb"))
c := dag.NewRawNode([]byte("cccc"))
nd1 := &dag.ProtoNode{}
nd1.AddNodeLink("cat", a)
nd2 := &dag.ProtoNode{}
nd2.AddNodeLink("first", nd1)
nd2.AddNodeLink("dog", b)
nd2.AddNodeLink("repeat", nd1)
nd3 := &dag.ProtoNode{}
nd3.AddNodeLink("second", nd2)
nd3.AddNodeLink("bear", c)
assertAddNodes(t, dserv, a, b, c, nd1, nd2, nd3)
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
// the graph assembled above looks as follows, in order:
// nd3 -> [c, nd2 -> [nd1 -> a, b, nd1 -> a]]
// this selector starts at n3, and traverses a link at index 1 (nd2, the second link, zero indexed)
// it then recursively traverses all of its children
// the only node skipped is 'c' -- link at index 0 immediately below nd3
// the purpose is simply to show we are not writing the entire dag underneath
// nd3
selector := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Links",
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()
sc := NewSelectiveCar(context.Background(), sourceBs, []Dag{Dag{Root: nd3.Cid(), Selector: selector}})
// write car in one step
buf := new(bytes.Buffer)
blockCount := 0
err := sc.Write(buf, func(block Block) error {
blockCount++
return nil
})
require.Equal(t, blockCount, 5)
require.NoError(t, err)
// create a new builder for two-step write
sc2 := NewSelectiveCar(context.Background(), sourceBs, []Dag{Dag{Root: nd3.Cid(), Selector: selector}})
// write car in two steps
scp, err := sc2.Prepare()
require.NoError(t, err)
buf2 := new(bytes.Buffer)
err = scp.Dump(buf2)
require.NoError(t, err)
// verify preparation step correctly assesed length and blocks
require.Equal(t, scp.Size(), uint64(buf.Len()))
require.Equal(t, len(scp.Cids()), blockCount)
// verify equal data written by both methods
require.Equal(t, buf.Bytes(), buf2.Bytes())
// readout car and verify contents
bserv := dstest.Bserv()
ch, err := LoadCar(bserv.Blockstore(), buf)
require.NoError(t, err)
require.Equal(t, len(ch.Roots), 1)
require.True(t, ch.Roots[0].Equals(nd3.Cid()))
bs := bserv.Blockstore()
for _, nd := range []format.Node{a, b, nd1, nd2, nd3} {
has, err := bs.Has(nd.Cid())
require.NoError(t, err)
require.True(t, has)
}
for _, nd := range []format.Node{c} {
has, err := bs.Has(nd.Cid())
require.NoError(t, err)
require.False(t, has)
}
}
......@@ -2,14 +2,15 @@ module github.com/ipld/go-car
require (
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.2
github.com/ipfs/go-cid v0.0.3
github.com/ipfs/go-ipld-cbor v0.0.2
github.com/ipfs/go-ipld-format v0.0.2
github.com/ipfs/go-merkledag v0.2.4
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5
github.com/multiformats/go-multihash v0.0.5
github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14 // indirect
github.com/stretchr/testify v1.3.0
github.com/urfave/cli v1.20.0
github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 // indirect
)
go 1.13
This diff is collapsed.
package car
import (
"bytes"
"context"
"errors"
"fmt"
"io"
cid "github.com/ipfs/go-cid"
util "github.com/ipld/go-car/util"
"github.com/ipld/go-ipld-prime"
dagpb "github.com/ipld/go-ipld-prime-proto"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
)
// Dag is a root/selector combo to put into a car
type Dag struct {
Root cid.Cid
Selector ipld.Node
}
// Block is all information and metadata about a block that is part of a car file
type Block struct {
BlockCID cid.Cid
Data []byte
Offset uint64
Size uint64
}
// SelectiveCar is a car file based on root + selector combos instead of just
// a single root and complete dag walk
type SelectiveCar struct {
ctx context.Context
dags []Dag
store ReadStore
}
// OnCarHeaderFunc is called during traversal when the header is created
type OnCarHeaderFunc func(CarHeader) error
// OnNewCarBlockFunc is called during traveral when a new unique block is encountered
type OnNewCarBlockFunc func(Block) error
// SelectiveCarPrepared is a SelectiveCar that has already been traversed, such that it
// can be written quicker with Dump. It also contains metadata already collection about
// the Car file like size and number of blocks that go into it
type SelectiveCarPrepared struct {
SelectiveCar
size uint64
header CarHeader
cids []cid.Cid
}
// NewSelectiveCar creates a new SelectiveCar for the given car file based
// a block store and set of root+selector pairs
func NewSelectiveCar(ctx context.Context, store ReadStore, dags []Dag) SelectiveCar {
return SelectiveCar{
ctx: ctx,
store: store,
dags: dags,
}
}
func (sc SelectiveCar) traverse(onCarHeader OnCarHeaderFunc, onNewCarBlock OnNewCarBlockFunc) (uint64, error) {
traverser := &selectiveCarTraverser{onCarHeader, onNewCarBlock, 0, cid.NewSet(), sc}
return traverser.traverse()
}
// Prepare traverse a car file and collects data on what is about to be written, but
// does not actually write the file
func (sc SelectiveCar) Prepare() (SelectiveCarPrepared, error) {
var header CarHeader
var cids []cid.Cid
onCarHeader := func(h CarHeader) error {
header = h
return nil
}
onNewCarBlock := func(block Block) error {
cids = append(cids, block.BlockCID)
return nil
}
size, err := sc.traverse(onCarHeader, onNewCarBlock)
if err != nil {
return SelectiveCarPrepared{}, err
}
return SelectiveCarPrepared{sc, size, header, cids}, nil
}
func (sc SelectiveCar) Write(w io.Writer, userOnNewCarBlocks ...OnNewCarBlockFunc) error {
onCarHeader := func(h CarHeader) error {
if err := WriteHeader(&h, w); err != nil {
return fmt.Errorf("failed to write car header: %s", err)
}
return nil
}
onNewCarBlock := func(block Block) error {
err := util.LdWrite(w, block.BlockCID.Bytes(), block.Data)
if err != nil {
return err
}
for _, userOnNewCarBlock := range userOnNewCarBlocks {
err := userOnNewCarBlock(block)
if err != nil {
return err
}
}
return nil
}
_, err := sc.traverse(onCarHeader, onNewCarBlock)
return err
}
// Size returns the total size in bytes of the car file that will be written
func (sc SelectiveCarPrepared) Size() uint64 {
return sc.size
}
// Header returns the header for the car file that will be written
func (sc SelectiveCarPrepared) Header() CarHeader {
return sc.header
}
// Cids returns the list of unique block cids that will be written to the car file
func (sc SelectiveCarPrepared) Cids() []cid.Cid {
return sc.cids
}
// Dump writes the car file as quickly as possible based on information already
// collected
func (sc SelectiveCarPrepared) Dump(w io.Writer) error {
if err := WriteHeader(&sc.header, w); err != nil {
return fmt.Errorf("failed to write car header: %s", err)
}
for _, c := range sc.cids {
blk, err := sc.store.Get(c)
if err != nil {
return err
}
raw := blk.RawData()
err = util.LdWrite(w, c.Bytes(), raw)
if err != nil {
return err
}
}
return nil
}
type selectiveCarTraverser struct {
onCarHeader OnCarHeaderFunc
onNewCarBlock OnNewCarBlockFunc
offset uint64
cidSet *cid.Set
sc SelectiveCar
}
func (sct *selectiveCarTraverser) traverse() (uint64, error) {
err := sct.traverseHeader()
if err != nil {
return 0, err
}
err = sct.traverseBlocks()
if err != nil {
return 0, err
}
return sct.offset, nil
}
func (sct *selectiveCarTraverser) traverseHeader() error {
roots := make([]cid.Cid, 0, len(sct.sc.dags))
for _, carDag := range sct.sc.dags {
roots = append(roots, carDag.Root)
}
header := CarHeader{
Roots: roots,
Version: 1,
}
size, err := HeaderSize(&header)
if err != nil {
return err
}
sct.offset += size
return sct.onCarHeader(header)
}
func (sct *selectiveCarTraverser) loader(lnk ipld.Link, ctx ipld.LinkContext) (io.Reader, error) {
cl, ok := lnk.(cidlink.Link)
if !ok {
return nil, errors.New("Incorrect Link Type")
}
c := cl.Cid
blk, err := sct.sc.store.Get(c)
if err != nil {
return nil, err
}
raw := blk.RawData()
if !sct.cidSet.Has(c) {
sct.cidSet.Add(c)
size := util.LdSize(c.Bytes(), raw)
err := sct.onNewCarBlock(Block{
BlockCID: c,
Data: raw,
Offset: sct.offset,
Size: size,
})
if err != nil {
return nil, err
}
sct.offset += size
}
return bytes.NewReader(raw), nil
}
func (sct *selectiveCarTraverser) traverseBlocks() error {
nbc := dagpb.AddDagPBSupportToChooser(func(ipld.Link, ipld.LinkContext) ipld.NodeBuilder {
return ipldfree.NodeBuilder()
})
for _, carDag := range sct.sc.dags {
parsed, err := selector.ParseSelector(carDag.Selector)
if err != nil {
return err
}
lnk := cidlink.Link{Cid: carDag.Root}
nb := nbc(lnk, ipld.LinkContext{})
nd, err := lnk.Load(sct.sc.ctx, ipld.LinkContext{}, nb, sct.loader)
if err != nil {
return err
}
err = traversal.Progress{
Cfg: &traversal.Config{
Ctx: sct.sc.ctx,
LinkLoader: sct.loader,
LinkNodeBuilderChooser: nbc,
},
}.WalkAdv(nd, parsed, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil })
if err != nil {
return err
}
}
return nil
}
......@@ -89,6 +89,16 @@ func LdWrite(w io.Writer, d ...[]byte) error {
return nil
}
func LdSize(d ...[]byte) uint64 {
var sum uint64
for _, s := range d {
sum += uint64(len(s))
}
buf := make([]byte, 8)
n := binary.PutUvarint(buf, sum)
return sum + uint64(n)
}
func LdRead(r *bufio.Reader) ([]byte, error) {
l, err := binary.ReadUvarint(r)
if err != nil {
......
package util_test
import (
"bytes"
"math/rand"
"testing"
"github.com/ipld/go-car/util"
"github.com/stretchr/testify/require"
)
func TestLdSize(t *testing.T) {
for i := 0; i < 5; i++ {
var buf bytes.Buffer
data := make([][]byte, 5)
for j := 0; j < 5; j++ {
data[j] = make([]byte, rand.Intn(30))
_, err := rand.Read(data[j])
require.NoError(t, err)
}
size := util.LdSize(data...)
err := util.LdWrite(&buf, data...)
require.NoError(t, err)
require.Equal(t, uint64(len(buf.Bytes())), size)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment