add_test.go 8.7 KB
Newer Older
1 2 3
package coreunix

import (
Jeromy's avatar
Jeromy committed
4
	"bytes"
5
	"context"
Jeromy's avatar
Jeromy committed
6 7
	"io"
	"io/ioutil"
8 9
	"math/rand"
	"os"
10
	"path/filepath"
11
	"testing"
Jeromy's avatar
Jeromy committed
12
	"time"
13

14
	"github.com/ipfs/go-ipfs/core"
Jeromy's avatar
Jeromy committed
15
	"github.com/ipfs/go-ipfs/pin/gc"
16
	"github.com/ipfs/go-ipfs/repo"
17

Jakub Sztandera's avatar
Jakub Sztandera committed
18 19 20 21 22 23 24 25 26 27 28
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipfs/go-blockservice"
	cid "github.com/ipfs/go-cid"
	datastore "github.com/ipfs/go-datastore"
	syncds "github.com/ipfs/go-datastore/sync"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	config "github.com/ipfs/go-ipfs-config"
	files "github.com/ipfs/go-ipfs-files"
	pi "github.com/ipfs/go-ipfs-posinfo"
	dag "github.com/ipfs/go-merkledag"
	coreiface "github.com/ipfs/interface-go-ipfs-core"
29 30
)

31 32
const testPeerID = "QmTFauExutTsy4XP6JbMFcw2Wa9645HJt2bTqL6qYDCKfe"

Erik Ingenito's avatar
Erik Ingenito committed
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
func TestAddMultipleGCLive(t *testing.T) {
	r := &repo.Mock{
		C: config.Config{
			Identity: config.Identity{
				PeerID: testPeerID, // required by offline node
			},
		},
		D: syncds.MutexWrap(datastore.NewMapDatastore()),
	}
	node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
	if err != nil {
		t.Fatal(err)
	}

	out := make(chan interface{}, 10)
	adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
	if err != nil {
		t.Fatal(err)
	}
	adder.Out = out

	// make two files with pipes so we can 'pause' the add for timing of the test
	piper1, pipew1 := io.Pipe()
	hangfile1 := files.NewReaderFile(piper1)

	piper2, pipew2 := io.Pipe()
	hangfile2 := files.NewReaderFile(piper2)

	rfc := files.NewBytesFile([]byte("testfileA"))

	slf := files.NewMapDirectory(map[string]files.Node{
		"a": hangfile1,
		"b": hangfile2,
		"c": rfc,
	})

	go func() {
		defer close(out)
		adder.AddAllAndPin(slf)
		// Ignore errors for clarity - the real bug would be gc'ing files while adding them, not this resultant error
	}()

	// Start writing the first file but don't close the stream
	if _, err := pipew1.Write([]byte("some data for file a")); err != nil {
		t.Fatal(err)
	}

	var gc1out <-chan gc.Result
	gc1started := make(chan struct{})
	go func() {
		defer close(gc1started)
		gc1out = gc.GC(context.Background(), node.Blockstore, node.Repo.Datastore(), node.Pinning, nil)
	}()

	// GC shouldn't get the lock until after the file is completely added
	select {
	case <-gc1started:
		t.Fatal("gc shouldnt have started yet")
	default:
	}

	// finish write and unblock gc
	pipew1.Close()

	// Should have gotten the lock at this point
	<-gc1started

	removedHashes := make(map[string]struct{})
	for r := range gc1out {
		if r.Error != nil {
			t.Fatal(err)
		}
		removedHashes[r.KeyRemoved.String()] = struct{}{}
	}

	if _, err := pipew2.Write([]byte("some data for file b")); err != nil {
		t.Fatal(err)
	}

	var gc2out <-chan gc.Result
	gc2started := make(chan struct{})
	go func() {
		defer close(gc2started)
		gc2out = gc.GC(context.Background(), node.Blockstore, node.Repo.Datastore(), node.Pinning, nil)
	}()

	select {
	case <-gc2started:
		t.Fatal("gc shouldnt have started yet")
	default:
	}

	pipew2.Close()

	<-gc2started

	for r := range gc2out {
		if r.Error != nil {
			t.Fatal(err)
		}
		removedHashes[r.KeyRemoved.String()] = struct{}{}
	}

	for o := range out {
		if _, ok := removedHashes[o.(*coreiface.AddEvent).Path.Cid().String()]; ok {
			t.Fatal("gc'ed a hash we just added")
		}
	}
}

Jeromy's avatar
Jeromy committed
143 144 145 146
func TestAddGCLive(t *testing.T) {
	r := &repo.Mock{
		C: config.Config{
			Identity: config.Identity{
147
				PeerID: testPeerID, // required by offline node
Jeromy's avatar
Jeromy committed
148 149
			},
		},
150
		D: syncds.MutexWrap(datastore.NewMapDatastore()),
Jeromy's avatar
Jeromy committed
151 152 153 154 155 156 157
	}
	node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
	if err != nil {
		t.Fatal(err)
	}

	out := make(chan interface{})
158
	adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
Jeromy's avatar
Jeromy committed
159 160 161
	if err != nil {
		t.Fatal(err)
	}
162
	adder.Out = out
Jeromy's avatar
Jeromy committed
163

Łukasz Magiera's avatar
Łukasz Magiera committed
164
	rfa := files.NewBytesFile([]byte("testfileA"))
Jeromy's avatar
Jeromy committed
165 166 167

	// make two files with pipes so we can 'pause' the add for timing of the test
	piper, pipew := io.Pipe()
Łukasz Magiera's avatar
Łukasz Magiera committed
168
	hangfile := files.NewReaderFile(piper)
Jeromy's avatar
Jeromy committed
169

Łukasz Magiera's avatar
Łukasz Magiera committed
170
	rfd := files.NewBytesFile([]byte("testfileD"))
Jeromy's avatar
Jeromy committed
171

Łukasz Magiera's avatar
Łukasz Magiera committed
172
	slf := files.NewMapDirectory(map[string]files.Node{
173 174 175
		"a": rfa,
		"b": hangfile,
		"d": rfd,
Łukasz Magiera's avatar
Łukasz Magiera committed
176
	})
Jeromy's avatar
Jeromy committed
177 178 179 180 181

	addDone := make(chan struct{})
	go func() {
		defer close(addDone)
		defer close(out)
182
		_, err := adder.AddAllAndPin(slf)
Jeromy's avatar
Jeromy committed
183 184

		if err != nil {
185
			t.Error(err)
Jeromy's avatar
Jeromy committed
186 187 188 189 190 191 192
		}

	}()

	addedHashes := make(map[string]struct{})
	select {
	case o := <-out:
193
		addedHashes[o.(*coreiface.AddEvent).Path.Cid().String()] = struct{}{}
Jeromy's avatar
Jeromy committed
194 195 196 197
	case <-addDone:
		t.Fatal("add shouldnt complete yet")
	}

198
	var gcout <-chan gc.Result
Jeromy's avatar
Jeromy committed
199 200 201
	gcstarted := make(chan struct{})
	go func() {
		defer close(gcstarted)
202
		gcout = gc.GC(context.Background(), node.Blockstore, node.Repo.Datastore(), node.Pinning, nil)
Jeromy's avatar
Jeromy committed
203 204 205
	}()

	// gc shouldnt start until we let the add finish its current file.
206 207 208
	if _, err := pipew.Write([]byte("some data for file b")); err != nil {
		t.Fatal(err)
	}
Jeromy's avatar
Jeromy committed
209 210 211 212 213 214 215 216 217 218 219 220 221

	select {
	case <-gcstarted:
		t.Fatal("gc shouldnt have started yet")
	default:
	}

	time.Sleep(time.Millisecond * 100) // make sure gc gets to requesting lock

	// finish write and unblock gc
	pipew.Close()

	// receive next object from adder
222
	o := <-out
223
	addedHashes[o.(*coreiface.AddEvent).Path.Cid().String()] = struct{}{}
Jeromy's avatar
Jeromy committed
224

225
	<-gcstarted
Jeromy's avatar
Jeromy committed
226

227 228 229 230 231
	for r := range gcout {
		if r.Error != nil {
			t.Fatal(err)
		}
		if _, ok := addedHashes[r.KeyRemoved.String()]; ok {
Jeromy's avatar
Jeromy committed
232 233 234 235
			t.Fatal("gc'ed a hash we just added")
		}
	}

236
	var last cid.Cid
Jeromy's avatar
Jeromy committed
237 238
	for a := range out {
		// wait for it to finish
239
		c, err := cid.Decode(a.(*coreiface.AddEvent).Path.Cid().String())
Jeromy's avatar
Jeromy committed
240 241 242 243
		if err != nil {
			t.Fatal(err)
		}
		last = c
Jeromy's avatar
Jeromy committed
244 245 246 247 248
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

Jeromy's avatar
Jeromy committed
249
	set := cid.NewSet()
250
	err = dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(node.DAG), last, set.Visit)
Jeromy's avatar
Jeromy committed
251 252 253 254
	if err != nil {
		t.Fatal(err)
	}
}
255 256 257 258 259

func testAddWPosInfo(t *testing.T, rawLeaves bool) {
	r := &repo.Mock{
		C: config.Config{
			Identity: config.Identity{
260
				PeerID: testPeerID, // required by offline node
261 262
			},
		},
263
		D: syncds.MutexWrap(datastore.NewMapDatastore()),
264 265 266 267 268 269
	}
	node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
	if err != nil {
		t.Fatal(err)
	}

270
	bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: filepath.Join(os.TempDir(), "foo.txt"), t: t}
271 272 273 274 275 276
	bserv := blockservice.New(bs, node.Exchange)
	dserv := dag.NewDAGService(bserv)
	adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv)
	if err != nil {
		t.Fatal(err)
	}
277 278
	out := make(chan interface{})
	adder.Out = out
279 280
	adder.Progress = true
	adder.RawLeaves = rawLeaves
281
	adder.NoCopy = true
282 283 284 285 286

	data := make([]byte, 5*1024*1024)
	rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error
	fileData := ioutil.NopCloser(bytes.NewBuffer(data))
	fileInfo := dummyFileInfo{"foo.txt", int64(len(data)), time.Now()}
287
	file, _ := files.NewReaderPathFile(filepath.Join(os.TempDir(), "foo.txt"), fileData, &fileInfo)
288 289 290

	go func() {
		defer close(adder.Out)
291
		_, err = adder.AddAllAndPin(file)
292
		if err != nil {
293
			t.Error(err)
294 295
		}
	}()
296
	for range out {
297 298
	}

299 300 301 302 303
	exp := 0
	nonOffZero := 0
	if rawLeaves {
		exp = 1
		nonOffZero = 19
304
	}
305
	if bs.countAtOffsetZero != exp {
306
		t.Fatalf("expected %d blocks with an offset at zero (one root and one leaf), got %d", exp, bs.countAtOffsetZero)
307 308
	}
	if bs.countAtOffsetNonZero != nonOffZero {
309
		// note: the exact number will depend on the size and the sharding algo. used
310
		t.Fatalf("expected %d blocks with an offset > 0, got %d", nonOffZero, bs.countAtOffsetNonZero)
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
	}
}

func TestAddWPosInfo(t *testing.T) {
	testAddWPosInfo(t, false)
}

func TestAddWPosInfoAndRawLeafs(t *testing.T) {
	testAddWPosInfo(t, true)
}

type testBlockstore struct {
	blockstore.GCBlockstore
	expectedPath         string
	t                    *testing.T
	countAtOffsetZero    int
	countAtOffsetNonZero int
}

func (bs *testBlockstore) Put(block blocks.Block) error {
	bs.CheckForPosInfo(block)
	return bs.GCBlockstore.Put(block)
}

func (bs *testBlockstore) PutMany(blocks []blocks.Block) error {
	for _, blk := range blocks {
		bs.CheckForPosInfo(blk)
	}
	return bs.GCBlockstore.PutMany(blocks)
}

342
func (bs *testBlockstore) CheckForPosInfo(block blocks.Block) {
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
	fsn, ok := block.(*pi.FilestoreNode)
	if ok {
		posInfo := fsn.PosInfo
		if posInfo.FullPath != bs.expectedPath {
			bs.t.Fatal("PosInfo does not have the expected path")
		}
		if posInfo.Offset == 0 {
			bs.countAtOffsetZero += 1
		} else {
			bs.countAtOffsetNonZero += 1
		}
	}
}

type dummyFileInfo struct {
	name    string
	size    int64
	modTime time.Time
}

func (fi *dummyFileInfo) Name() string       { return fi.name }
func (fi *dummyFileInfo) Size() int64        { return fi.size }
func (fi *dummyFileInfo) Mode() os.FileMode  { return 0 }
func (fi *dummyFileInfo) ModTime() time.Time { return fi.modTime }
func (fi *dummyFileInfo) IsDir() bool        { return false }
func (fi *dummyFileInfo) Sys() interface{}   { return nil }