dagmodifier.go 10.6 KB
Newer Older
1 2 3 4
package mod

import (
	"bytes"
5
	"context"
6 7 8
	"errors"
	"io"

9 10 11 12 13 14
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
15

16
	cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
17
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
18
	node "gx/ipfs/Qmb3Hm9QDFmfYuET4pu7Kyg8JV78jFa1nvZx5vnCZsK4ck/go-ipld-format"
19 20
)

Jeromy's avatar
Jeromy committed
21 22 23
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

24 25 26 27 28 29 30 31
// 2MB
var writebufferSize = 1 << 21

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
32
	curNode *mdag.ProtoNode
33

34
	splitter   chunk.SplitterGen
35 36 37 38 39 40 41
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

42
	read uio.DagReader
43 44
}

45 46 47 48 49 50
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
	pbn, ok := from.(*mdag.ProtoNode)
	if !ok {
		return nil, mdag.ErrNotProtobuf
	}

51
	return &DagModifier{
52
		curNode:  pbn.Copy().(*mdag.ProtoNode),
53 54 55 56 57 58 59 60 61
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
62 63
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
64 65 66 67 68 69
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
70 71 72 73 74 75 76 77 78 79 80
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

81
		err = dm.Sync()
82 83 84 85 86 87 88 89 90 91 92 93 94
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
95
	for i := range b {
96 97 98 99 100
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
101 102
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
103 104
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
105
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
106
	nnode, err := dm.appendData(dm.curNode, spl)
107 108 109 110 111 112 113
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
114 115 116 117 118 119 120

	pbnnode, ok := nnode.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnnode
121 122 123
	return nil
}

Jeromy's avatar
Jeromy committed
124
// Write continues writing to the dag at the current offset
125 126 127 128 129 130 131
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
132

133 134 135 136 137 138
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
139
		err := dm.Sync()
140 141 142 143 144 145 146 147
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
148
	pbn, err := ft.FromBytes(dm.curNode.Data())
149 150 151 152
	if err != nil {
		return 0, err
	}

153 154 155 156
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
157 158 159 160 161
	}

	return int64(pbn.GetFilesize()), nil
}

162 163
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
164
	// No buffer? Nothing to do
165 166 167 168 169 170 171 172 173 174
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
175
	// Number of bytes we're going to write
176 177
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
178
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
179
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
180 181 182 183
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
184
	nd, err := dm.dagserv.Get(dm.ctx, thisc)
185 186 187 188
	if err != nil {
		return err
	}

189 190 191 192 193 194
	pbnd, ok := nd.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnd
195

Jeromy's avatar
Jeromy committed
196
	// need to write past end of current dag
197
	if !done {
198
		nd, err := dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
199 200 201 202
		if err != nil {
			return err
		}

Jeromy's avatar
Jeromy committed
203
		_, err = dm.dagserv.Add(nd)
204 205 206 207
		if err != nil {
			return err
		}

208 209 210 211 212 213
		pbnode, ok := nd.(*mdag.ProtoNode)
		if !ok {
			return mdag.ErrNotProtobuf
		}

		dm.curNode = pbnode
214 215 216 217 218 219 220 221
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
222
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
223 224
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
225
func (dm *DagModifier) modifyDag(node *mdag.ProtoNode, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
226
	f, err := ft.FromBytes(node.Data())
227
	if err != nil {
Jeromy's avatar
Jeromy committed
228
		return nil, false, err
229 230
	}

Jeromy's avatar
Jeromy committed
231
	// If we've reached a leaf node.
232
	if len(node.Links()) == 0 {
233 234
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
235
			return nil, false, err
236 237 238 239 240
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
241
			return nil, false, err
242 243
		}

244
		nd := new(mdag.ProtoNode)
245
		nd.SetData(b)
246 247
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
248
			return nil, false, err
249 250 251 252
		}

		// Hey look! we're done!
		var done bool
253
		if n < len(f.Data[offset:]) {
254 255 256
			done = true
		}

Jeromy's avatar
Jeromy committed
257
		return k, done, nil
258 259 260 261 262
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
263
		// We found the correct child to write into
264
		if cur+bs > offset {
265
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
266
			if err != nil {
Jeromy's avatar
Jeromy committed
267
				return nil, false, err
268
			}
269 270 271 272 273 274 275

			childpb, ok := child.(*mdag.ProtoNode)
			if !ok {
				return nil, false, mdag.ErrNotProtobuf
			}

			k, sdone, err := dm.modifyDag(childpb, offset-cur, data)
276
			if err != nil {
Jeromy's avatar
Jeromy committed
277
				return nil, false, err
278 279 280
			}

			offset += bs
281
			node.Links()[i].Cid = k
282

283
			// Recache serialized node
284
			_, err = node.EncodeProtobuf(true)
285
			if err != nil {
Jeromy's avatar
Jeromy committed
286
				return nil, false, err
287 288
			}

289
			if sdone {
290
				// No more bytes to write!
291 292 293
				done = true
				break
			}
294
			offset = cur + bs
295 296 297 298 299
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
300
	return k, done, err
301 302
}

Jeromy's avatar
Jeromy committed
303
// appendData appends the blocks from the given chan to the end of this dag
304
func (dm *DagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (node.Node, error) {
305 306 307 308 309
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

rht's avatar
rht committed
310
	return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
311 312
}

Jeromy's avatar
Jeromy committed
313
// Read data from this dag starting at the current offset
314
func (dm *DagModifier) Read(b []byte) (int, error) {
315
	err := dm.readPrep()
316 317 318 319
	if err != nil {
		return 0, err
	}

320 321 322 323 324 325 326 327 328 329 330
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

331
	if dm.read == nil {
332 333
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
334
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
335
			cancel()
336
			return err
337 338
		}

339
		i, err := dr.Seek(int64(dm.curWrOff), io.SeekStart)
340
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
341
			cancel()
342
			return err
343 344 345
		}

		if i != int64(dm.curWrOff) {
Jakub Sztandera's avatar
Jakub Sztandera committed
346
			cancel()
347
			return ErrSeekFail
348 349
		}

350
		dm.readCancel = cancel
351 352 353
		dm.read = dr
	}

354 355 356 357 358 359 360 361 362 363 364
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
365 366 367 368 369
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
370
func (dm *DagModifier) GetNode() (*mdag.ProtoNode, error) {
371
	err := dm.Sync()
372 373 374
	if err != nil {
		return nil, err
	}
375
	return dm.curNode.Copy().(*mdag.ProtoNode), nil
376 377
}

Jeromy's avatar
Jeromy committed
378
// HasChanges returned whether or not there are unflushed changes to this dag
379 380 381 382 383
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
384
	err := dm.Sync()
385 386 387 388
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
389 390 391 392 393 394
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
395
	switch whence {
396
	case io.SeekCurrent:
Jeromy's avatar
Jeromy committed
397
		newoffset = dm.curWrOff + uint64(offset)
398
	case io.SeekStart:
Jeromy's avatar
Jeromy committed
399
		newoffset = uint64(offset)
400
	case io.SeekEnd:
401
		newoffset = uint64(fisize) - uint64(offset)
402
	default:
Jeromy's avatar
Jeromy committed
403
		return 0, ErrUnrecognizedWhence
404 405
	}

406 407
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
408 409 410 411 412 413
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

414 415 416 417 418 419 420 421 422 423 424
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
425
	err := dm.Sync()
426 427 428 429 430 431 432 433 434
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
435
	// Truncate can also be used to expand the file
436
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
437
		return dm.expandSparse(int64(size) - realSize)
438 439
	}

440
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
441 442 443 444 445 446 447 448 449 450 451 452 453
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
454
// dagTruncate truncates the given node to 'size' and returns the modified Node
455 456
func dagTruncate(ctx context.Context, nd *mdag.ProtoNode, size uint64, ds mdag.DAGService) (*mdag.ProtoNode, error) {
	if len(nd.Links()) == 0 {
457
		// TODO: this can likely be done without marshaling and remarshaling
458
		pbn, err := ft.FromBytes(nd.Data())
459 460 461 462
		if err != nil {
			return nil, err
		}

463
		nd.SetData(ft.WrapData(pbn.Data[:size]))
464 465 466 467 468
		return nd, nil
	}

	var cur uint64
	end := 0
469
	var modified *mdag.ProtoNode
470
	ndata := new(ft.FSNode)
471
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
472
		child, err := lnk.GetNode(ctx, ds)
473 474 475 476
		if err != nil {
			return nil, err
		}

477 478 479 480 481 482
		childpb, ok := child.(*mdag.ProtoNode)
		if !ok {
			return nil, err
		}

		childsize, err := ft.DataSize(childpb.Data())
483 484 485 486
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
487
		// found the child we want to cut
488
		if size < cur+childsize {
489
			nchild, err := dagTruncate(ctx, childpb, size-cur, ds)
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

509
	nd.SetLinks(nd.Links()[:end])
510 511 512 513 514 515 516 517 518 519
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

520
	nd.SetData(d)
521

522
	// invalidate cache and recompute serialized data
523
	_, err = nd.EncodeProtobuf(true)
524 525 526 527
	if err != nil {
		return nil, err
	}

528 529
	return nd, nil
}