Commit 428e8f9d authored by Jeromy's avatar Jeromy

Refactor ipnsfs into a more generic and well tested mfs

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 07b2cea0
......@@ -67,6 +67,7 @@ func WrapData(b []byte) []byte {
typ := pb.Data_Raw
pbdata.Data = b
pbdata.Type = &typ
pbdata.Filesize = proto.Uint64(uint64(len(b)))
out, err := proto.Marshal(pbdata)
if err != nil {
......
......@@ -15,7 +15,6 @@ import (
help "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"
mdag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
......@@ -36,7 +35,6 @@ var log = logging.Logger("dagio")
type DagModifier struct {
dagserv mdag.DAGService
curNode *mdag.Node
mp pin.Pinner
splitter chunk.SplitterGen
ctx context.Context
......@@ -49,13 +47,12 @@ type DagModifier struct {
read *uio.DagReader
}
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.Pinner, spl chunk.SplitterGen) (*DagModifier, error) {
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
return &DagModifier{
curNode: from.Copy(),
dagserv: serv,
splitter: spl,
ctx: ctx,
mp: mp,
}, nil
}
......@@ -174,7 +171,7 @@ func (dm *DagModifier) Sync() error {
buflen := dm.wrBuf.Len()
// Grab key for unpinning after mod operation
curk, err := dm.curNode.Key()
_, err := dm.curNode.Key()
if err != nil {
return err
}
......@@ -208,15 +205,6 @@ func (dm *DagModifier) Sync() error {
dm.curNode = nd
}
// Finalize correct pinning, and flush pinner.
// Be careful about the order, as curk might equal thisk.
dm.mp.RemovePinWithMode(curk, pin.Recursive)
dm.mp.PinWithMode(thisk, pin.Recursive)
err = dm.mp.Flush()
if err != nil {
return err
}
dm.writeStart += uint64(buflen)
dm.wrBuf = nil
......
......@@ -4,7 +4,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
......@@ -17,8 +16,6 @@ import (
h "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"
mdag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
gc "github.com/ipfs/go-ipfs/pin/gc"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
u "github.com/ipfs/go-ipfs/util"
......@@ -27,25 +24,24 @@ import (
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func getMockDagServ(t testing.TB) (mdag.DAGService, pin.Pinner) {
func getMockDagServ(t testing.TB) mdag.DAGService {
dstore := ds.NewMapDatastore()
tsds := sync.MutexWrap(dstore)
bstore := blockstore.NewBlockstore(tsds)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
return dserv, pin.NewPinner(tsds, dserv)
return mdag.NewDAGService(bserv)
}
func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.GCBlockstore, pin.Pinner) {
func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.GCBlockstore) {
dstore := ds.NewMapDatastore()
tsds := sync.MutexWrap(dstore)
bstore := blockstore.NewBlockstore(tsds)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
return dserv, bstore, pin.NewPinner(tsds, dserv)
return dserv, bstore
}
func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.Pinner) ([]byte, *mdag.Node) {
func getNode(t testing.TB, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
in := io.LimitReader(u.NewTimeSeededRand(), size)
node, err := imp.BuildTrickleDagFromReader(dserv, sizeSplitterGen(500)(in))
if err != nil {
......@@ -118,12 +114,12 @@ func sizeSplitterGen(size int64) chunk.SplitterGen {
}
func TestDagModifierBasic(t *testing.T) {
dserv, pin := getMockDagServ(t)
b, n := getNode(t, dserv, 50000, pin)
dserv := getMockDagServ(t)
b, n := getNode(t, dserv, 50000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pin, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
......@@ -172,13 +168,13 @@ func TestDagModifierBasic(t *testing.T) {
}
func TestMultiWrite(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
......@@ -225,13 +221,13 @@ func TestMultiWrite(t *testing.T) {
}
func TestMultiWriteAndFlush(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
......@@ -273,13 +269,13 @@ func TestMultiWriteAndFlush(t *testing.T) {
}
func TestWriteNewFile(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
......@@ -316,13 +312,13 @@ func TestWriteNewFile(t *testing.T) {
}
func TestMultiWriteCoal(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
......@@ -362,13 +358,13 @@ func TestMultiWriteCoal(t *testing.T) {
}
func TestLargeWriteChunks(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
......@@ -401,12 +397,12 @@ func TestLargeWriteChunks(t *testing.T) {
}
func TestDagTruncate(t *testing.T) {
dserv, pins := getMockDagServ(t)
b, n := getNode(t, dserv, 50000, pins)
dserv := getMockDagServ(t)
b, n := getNode(t, dserv, 50000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
......@@ -415,164 +411,92 @@ func TestDagTruncate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
_, err = dagmod.Seek(0, os.SEEK_SET)
size, err := dagmod.Size()
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(dagmod)
if err != nil {
t.Fatal(err)
}
if err = arrComp(out, b[:12345]); err != nil {
t.Fatal(err)
if size != 12345 {
t.Fatal("size was incorrect!")
}
}
func TestSparseWrite(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
_, err = dagmod.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 5000)
u.NewTimeSeededRand().Read(buf[2500:])
wrote, err := dagmod.WriteAt(buf[2500:], 2500)
out, err := ioutil.ReadAll(dagmod)
if err != nil {
t.Fatal(err)
}
if wrote != 2500 {
t.Fatal("incorrect write amount")
}
_, err = dagmod.Seek(0, os.SEEK_SET)
if err != nil {
if err = arrComp(out, b[:12345]); err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(dagmod)
err = dagmod.Truncate(10)
if err != nil {
t.Fatal(err)
}
if err = arrComp(out, buf); err != nil {
t.Fatal(err)
}
}
func basicGC(t *testing.T, bs blockstore.GCBlockstore, pins pin.Pinner) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // in case error occurs during operation
out, err := gc.GC(ctx, bs, pins)
size, err = dagmod.Size()
if err != nil {
t.Fatal(err)
}
for range out {
if size != 10 {
t.Fatal("size was incorrect!")
}
}
func TestCorrectPinning(t *testing.T) {
dserv, bstore, pins := getMockDagServAndBstore(t)
b, n := getNode(t, dserv, 50000, pins)
func TestSparseWrite(t *testing.T) {
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 1024)
for i := 0; i < 100; i++ {
size, err := dagmod.Size()
if err != nil {
t.Fatal(err)
}
offset := rand.Intn(int(size))
u.NewTimeSeededRand().Read(buf)
if offset+len(buf) > int(size) {
b = append(b[:offset], buf...)
} else {
copy(b[offset:], buf)
}
n, err := dagmod.WriteAt(buf, int64(offset))
if err != nil {
t.Fatal(err)
}
if n != len(buf) {
t.Fatal("wrote incorrect number of bytes")
}
}
buf := make([]byte, 5000)
u.NewTimeSeededRand().Read(buf[2500:])
fisize, err := dagmod.Size()
wrote, err := dagmod.WriteAt(buf[2500:], 2500)
if err != nil {
t.Fatal(err)
}
if int(fisize) != len(b) {
t.Fatal("reported filesize incorrect", fisize, len(b))
if wrote != 2500 {
t.Fatal("incorrect write amount")
}
// Run a GC, then ensure we can still read the file correctly
basicGC(t, bstore, pins)
nd, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
read, err := uio.NewDagReader(context.Background(), nd, dserv)
_, err = dagmod.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(read)
out, err := ioutil.ReadAll(dagmod)
if err != nil {
t.Fatal(err)
}
if err = arrComp(out, b); err != nil {
t.Fatal(err)
}
rootk, err := nd.Key()
if err != nil {
if err = arrComp(out, buf); err != nil {
t.Fatal(err)
}
// Verify only one recursive pin
recpins := pins.RecursiveKeys()
if len(recpins) != 1 {
t.Fatal("Incorrect number of pinned entries")
}
// verify the correct node is pinned
if recpins[0] != rootk {
t.Fatal("Incorrect node recursively pinned")
}
}
func BenchmarkDagmodWrite(b *testing.B) {
b.StopTimer()
dserv, pins := getMockDagServ(b)
_, n := getNode(b, dserv, 0, pins)
dserv := getMockDagServ(b)
_, n := getNode(b, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wrsize := 4096
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
b.Fatal(err)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment