dagmodifier.go 12.7 KB
Newer Older
1 2
// Package mod provides DAG modification utilities to, for example,
// insert additional nodes in a unixfs DAG or truncate them.
3 4 5 6
package mod

import (
	"bytes"
7
	"context"
8
	"errors"
9
	"fmt"
10 11
	"io"

12 13 14 15 16 17
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
18 19

	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
Steven Allen's avatar
Steven Allen committed
20
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
21
	ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
22 23
)

24 25 26 27 28 29
// Common errors
var (
	ErrSeekFail           = errors.New("failed to seek properly")
	ErrUnrecognizedWhence = errors.New("unrecognized whence")
	ErrNotUnixfs          = fmt.Errorf("dagmodifier only supports unixfs nodes (proto or raw)")
)
Jeromy's avatar
Jeromy committed
30

31 32 33 34 35 36 37
// 2MB
var writebufferSize = 1 << 21

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
38 39
	dagserv ipld.DAGService
	curNode ipld.Node
40

41
	splitter   chunk.SplitterGen
42 43 44 45 46 47 48
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

49
	Prefix    cid.Prefix
50 51
	RawLeaves bool

52
	read uio.DagReader
53 54
}

55 56 57 58
// NewDagModifier returns a new DagModifier, the Cid prefix for newly
// created nodes will be inherted from the passed in node.  If the Cid
// version if not 0 raw leaves will also be enabled.  The Prefix and
// RawLeaves options can be overridden by changing them after the call.
59
func NewDagModifier(ctx context.Context, from ipld.Node, serv ipld.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
60 61 62 63 64
	switch from.(type) {
	case *mdag.ProtoNode, *mdag.RawNode:
		// ok
	default:
		return nil, ErrNotUnixfs
65 66
	}

67 68 69 70 71 72 73
	prefix := from.Cid().Prefix()
	prefix.Codec = cid.DagProtobuf
	rawLeaves := false
	if prefix.Version > 0 {
		rawLeaves = true
	}

74
	return &DagModifier{
75 76 77 78 79 80
		curNode:   from.Copy(),
		dagserv:   serv,
		splitter:  spl,
		ctx:       ctx,
		Prefix:    prefix,
		RawLeaves: rawLeaves,
81 82 83 84 85 86
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
87 88
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
89 90 91 92 93 94
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
95 96 97 98 99 100 101 102 103 104 105
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

106
		err = dm.Sync()
107 108 109 110 111 112 113 114 115 116 117 118 119
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
120
	for i := range b {
121 122 123 124 125
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
126 127
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
128 129
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
130
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
131
	nnode, err := dm.appendData(dm.curNode, spl)
132 133 134
	if err != nil {
		return err
	}
135
	err = dm.dagserv.Add(dm.ctx, nnode)
136
	return err
137 138
}

Jeromy's avatar
Jeromy committed
139
// Write continues writing to the dag at the current offset
140 141 142 143 144 145 146
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
147

148 149 150 151 152 153
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
154
		err := dm.Sync()
155 156 157 158 159 160 161
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

162
// Size returns the Filesize of the node
163
func (dm *DagModifier) Size() (int64, error) {
164 165 166 167 168 169 170 171 172 173
	fileSize, err := fileSize(dm.curNode)
	if err != nil {
		return 0, err
	}
	if dm.wrBuf != nil && int64(dm.wrBuf.Len())+int64(dm.writeStart) > int64(fileSize) {
		return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
	}
	return int64(fileSize), nil
}

174
func fileSize(n ipld.Node) (uint64, error) {
175
	switch nd := n.(type) {
176
	case *mdag.ProtoNode:
177
		f, err := ft.FromBytes(nd.Data())
178 179 180
		if err != nil {
			return 0, err
		}
181
		return f.GetFilesize(), nil
182
	case *mdag.RawNode:
183
		return uint64(len(nd.RawData())), nil
184 185
	default:
		return 0, ErrNotUnixfs
186 187 188
	}
}

189 190
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
191
	// No buffer? Nothing to do
192 193 194 195 196 197 198 199 200 201
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
202
	// Number of bytes we're going to write
203 204
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
205
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
206
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
207 208 209 210
	if err != nil {
		return err
	}

211
	dm.curNode, err = dm.dagserv.Get(dm.ctx, thisc)
212 213 214 215
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
216
	// need to write past end of current dag
217
	if !done {
218
		dm.curNode, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
219 220 221 222
		if err != nil {
			return err
		}

223
		err = dm.dagserv.Add(dm.ctx, dm.curNode)
224 225 226 227 228 229 230 231 232 233 234
		if err != nil {
			return err
		}
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
235
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
236 237
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
238
func (dm *DagModifier) modifyDag(n ipld.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
239 240 241 242 243 244 245 246
	// If we've reached a leaf node.
	if len(n.Links()) == 0 {
		switch nd0 := n.(type) {
		case *mdag.ProtoNode:
			f, err := ft.FromBytes(nd0.Data())
			if err != nil {
				return nil, false, err
			}
247

248 249 250 251
			n, err := data.Read(f.Data[offset:])
			if err != nil && err != io.EOF {
				return nil, false, err
			}
252

253 254 255 256 257
			// Update newly written node..
			b, err := proto.Marshal(f)
			if err != nil {
				return nil, false, err
			}
258

259 260
			nd := new(mdag.ProtoNode)
			nd.SetData(b)
261
			nd.SetPrefix(&nd0.Prefix)
262
			err = dm.dagserv.Add(dm.ctx, nd)
263 264 265
			if err != nil {
				return nil, false, err
			}
266

267 268 269 270 271 272
			// Hey look! we're done!
			var done bool
			if n < len(f.Data[offset:]) {
				done = true
			}

273
			return nd.Cid(), done, nil
274 275 276
		case *mdag.RawNode:
			origData := nd0.RawData()
			bytes := make([]byte, len(origData))
277

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
			// copy orig data up to offset
			copy(bytes, origData[:offset])

			// copy in new data
			n, err := data.Read(bytes[offset:])
			if err != nil && err != io.EOF {
				return nil, false, err
			}

			// copy remaining data
			offsetPlusN := int(offset) + n
			if offsetPlusN < len(origData) {
				copy(bytes[offsetPlusN:], origData[offsetPlusN:])
			}

			nd, err := mdag.NewRawNodeWPrefix(bytes, nd0.Cid().Prefix())
			if err != nil {
				return nil, false, err
			}
297
			err = dm.dagserv.Add(dm.ctx, nd)
298 299 300 301 302 303 304 305 306 307
			if err != nil {
				return nil, false, err
			}

			// Hey look! we're done!
			var done bool
			if n < len(bytes[offset:]) {
				done = true
			}

308
			return nd.Cid(), done, nil
309
		}
310
	}
311

312 313 314 315 316 317 318 319
	node, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, false, ErrNotUnixfs
	}

	f, err := ft.FromBytes(node.Data())
	if err != nil {
		return nil, false, err
320 321 322 323 324
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
325
		// We found the correct child to write into
326
		if cur+bs > offset {
327
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
328
			if err != nil {
Jeromy's avatar
Jeromy committed
329
				return nil, false, err
330
			}
331

332
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
333
			if err != nil {
Jeromy's avatar
Jeromy committed
334
				return nil, false, err
335 336
			}

337
			node.Links()[i].Cid = k
338

339
			// Recache serialized node
340
			_, err = node.EncodeProtobuf(true)
341
			if err != nil {
Jeromy's avatar
Jeromy committed
342
				return nil, false, err
343 344
			}

345
			if sdone {
346
				// No more bytes to write!
347 348 349
				done = true
				break
			}
350
			offset = cur + bs
351 352 353 354
		}
		cur += bs
	}

355 356
	err = dm.dagserv.Add(dm.ctx, node)
	return node.Cid(), done, err
357 358
}

Jeromy's avatar
Jeromy committed
359
// appendData appends the blocks from the given chan to the end of this dag
360
func (dm *DagModifier) appendData(nd ipld.Node, spl chunk.Splitter) (ipld.Node, error) {
361
	switch nd := nd.(type) {
362
	case *mdag.ProtoNode, *mdag.RawNode:
363
		dbp := &help.DagBuilderParams{
364 365
			Dagserv:   dm.dagserv,
			Maxlinks:  help.DefaultLinksPerBlock,
366
			Prefix:    &dm.Prefix,
367
			RawLeaves: dm.RawLeaves,
368
		}
369
		return trickle.Append(dm.ctx, nd, dbp.New(spl))
370 371 372
	default:
		return nil, ErrNotUnixfs
	}
373 374
}

Jeromy's avatar
Jeromy committed
375
// Read data from this dag starting at the current offset
376
func (dm *DagModifier) Read(b []byte) (int, error) {
377
	err := dm.readPrep()
378 379 380 381
	if err != nil {
		return 0, err
	}

382 383 384 385 386 387 388 389 390 391 392
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

393
	if dm.read == nil {
394 395
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
396
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
397
			cancel()
398
			return err
399 400
		}

401
		i, err := dr.Seek(int64(dm.curWrOff), io.SeekStart)
402
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
403
			cancel()
404
			return err
405 406 407
		}

		if i != int64(dm.curWrOff) {
Jakub Sztandera's avatar
Jakub Sztandera committed
408
			cancel()
409
			return ErrSeekFail
410 411
		}

412
		dm.readCancel = cancel
413 414 415
		dm.read = dr
	}

416 417 418
	return nil
}

419
// CtxReadFull reads data from this dag starting at the current offset
420 421 422 423 424 425 426
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
427 428 429 430 431
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
432
func (dm *DagModifier) GetNode() (ipld.Node, error) {
433
	err := dm.Sync()
434 435 436
	if err != nil {
		return nil, err
	}
437
	return dm.curNode.Copy(), nil
438 439
}

Jeromy's avatar
Jeromy committed
440
// HasChanges returned whether or not there are unflushed changes to this dag
441 442 443 444
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

445 446
// Seek modifies the offset according to whence. See unixfs/io for valid whence
// values.
447
func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
448
	err := dm.Sync()
449 450 451 452
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
453 454 455 456 457 458
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
459
	switch whence {
460
	case io.SeekCurrent:
Jeromy's avatar
Jeromy committed
461
		newoffset = dm.curWrOff + uint64(offset)
462
	case io.SeekStart:
Jeromy's avatar
Jeromy committed
463
		newoffset = uint64(offset)
464
	case io.SeekEnd:
465
		newoffset = uint64(fisize) - uint64(offset)
466
	default:
Jeromy's avatar
Jeromy committed
467
		return 0, ErrUnrecognizedWhence
468 469
	}

470 471
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
472 473 474 475 476 477
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

478 479 480 481 482 483 484 485 486 487
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

488 489
// Truncate truncates the current Node to 'size' and replaces it with the
// new one.
490
func (dm *DagModifier) Truncate(size int64) error {
491
	err := dm.Sync()
492 493 494 495 496 497 498 499 500
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
501
	// Truncate can also be used to expand the file
502
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
503
		return dm.expandSparse(int64(size) - realSize)
504 505
	}

506
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
507 508 509 510
	if err != nil {
		return err
	}

511
	err = dm.dagserv.Add(dm.ctx, nnode)
512 513 514 515 516 517 518 519
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
520
// dagTruncate truncates the given node to 'size' and returns the modified Node
521
func dagTruncate(ctx context.Context, n ipld.Node, size uint64, ds ipld.DAGService) (ipld.Node, error) {
522 523 524 525 526 527 528 529 530 531 532 533
	if len(n.Links()) == 0 {
		switch nd := n.(type) {
		case *mdag.ProtoNode:
			// TODO: this can likely be done without marshaling and remarshaling
			pbn, err := ft.FromBytes(nd.Data())
			if err != nil {
				return nil, err
			}
			nd.SetData(ft.WrapData(pbn.Data[:size]))
			return nd, nil
		case *mdag.RawNode:
			return mdag.NewRawNodeWPrefix(nd.RawData()[:size], nd.Cid().Prefix())
534
		}
535
	}
536

537 538 539
	nd, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, ErrNotUnixfs
540 541 542 543
	}

	var cur uint64
	end := 0
544
	var modified ipld.Node
545
	ndata := new(ft.FSNode)
546
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
547
		child, err := lnk.GetNode(ctx, ds)
548 549 550 551
		if err != nil {
			return nil, err
		}

552
		childsize, err := fileSize(child)
553 554 555 556
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
557
		// found the child we want to cut
558
		if size < cur+childsize {
559
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
560 561 562 563 564 565 566 567 568 569 570 571 572 573
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

574
	err := ds.Add(ctx, modified)
575 576 577 578
	if err != nil {
		return nil, err
	}

579
	nd.SetLinks(nd.Links()[:end])
580 581 582 583 584 585 586 587 588 589
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

590
	nd.SetData(d)
591

592
	// invalidate cache and recompute serialized data
593
	_, err = nd.EncodeProtobuf(true)
594 595 596 597
	if err != nil {
		return nil, err
	}

598 599
	return nd, nil
}