dagmodifier.go 12.4 KB
Newer Older
1 2 3 4
package mod

import (
	"bytes"
5
	"context"
6
	"errors"
7
	"fmt"
8 9
	"io"

10 11 12 13 14 15
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
16 17

	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
Steven Allen's avatar
Steven Allen committed
18
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
19
	ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
20 21
)

Jeromy's avatar
Jeromy committed
22 23 24
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

25 26 27 28 29 30 31
// 2MB
var writebufferSize = 1 << 21

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
32 33
	dagserv ipld.DAGService
	curNode ipld.Node
34

35
	splitter   chunk.SplitterGen
36 37 38 39 40 41 42
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

43
	Prefix    cid.Prefix
44 45
	RawLeaves bool

46
	read uio.DagReader
47 48
}

49 50
var ErrNotUnixfs = fmt.Errorf("dagmodifier only supports unixfs nodes (proto or raw)")

51 52 53 54
// NewDagModifier returns a new DagModifier, the Cid prefix for newly
// created nodes will be inherted from the passed in node.  If the Cid
// version if not 0 raw leaves will also be enabled.  The Prefix and
// RawLeaves options can be overridden by changing them after the call.
55
func NewDagModifier(ctx context.Context, from ipld.Node, serv ipld.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
56 57 58 59 60
	switch from.(type) {
	case *mdag.ProtoNode, *mdag.RawNode:
		// ok
	default:
		return nil, ErrNotUnixfs
61 62
	}

63 64 65 66 67 68 69
	prefix := from.Cid().Prefix()
	prefix.Codec = cid.DagProtobuf
	rawLeaves := false
	if prefix.Version > 0 {
		rawLeaves = true
	}

70
	return &DagModifier{
71 72 73 74 75 76
		curNode:   from.Copy(),
		dagserv:   serv,
		splitter:  spl,
		ctx:       ctx,
		Prefix:    prefix,
		RawLeaves: rawLeaves,
77 78 79 80 81 82
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
83 84
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
85 86 87 88 89 90
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
91 92 93 94 95 96 97 98 99 100 101
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

102
		err = dm.Sync()
103 104 105 106 107 108 109 110 111 112 113 114 115
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
116
	for i := range b {
117 118 119 120 121
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
122 123
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
124 125
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
126
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
127
	nnode, err := dm.appendData(dm.curNode, spl)
128 129 130
	if err != nil {
		return err
	}
131
	err = dm.dagserv.Add(dm.ctx, nnode)
132
	return err
133 134
}

Jeromy's avatar
Jeromy committed
135
// Write continues writing to the dag at the current offset
136 137 138 139 140 141 142
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
143

144 145 146 147 148 149
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
150
		err := dm.Sync()
151 152 153 154 155 156 157
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

158
// Size returns the Filesize of the node
159
func (dm *DagModifier) Size() (int64, error) {
160 161 162 163 164 165 166 167 168 169
	fileSize, err := fileSize(dm.curNode)
	if err != nil {
		return 0, err
	}
	if dm.wrBuf != nil && int64(dm.wrBuf.Len())+int64(dm.writeStart) > int64(fileSize) {
		return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
	}
	return int64(fileSize), nil
}

170
func fileSize(n ipld.Node) (uint64, error) {
171
	switch nd := n.(type) {
172
	case *mdag.ProtoNode:
173
		f, err := ft.FromBytes(nd.Data())
174 175 176
		if err != nil {
			return 0, err
		}
177
		return f.GetFilesize(), nil
178
	case *mdag.RawNode:
179
		return uint64(len(nd.RawData())), nil
180 181
	default:
		return 0, ErrNotUnixfs
182 183 184
	}
}

185 186
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
187
	// No buffer? Nothing to do
188 189 190 191 192 193 194 195 196 197
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
198
	// Number of bytes we're going to write
199 200
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
201
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
202
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
203 204 205 206
	if err != nil {
		return err
	}

207
	dm.curNode, err = dm.dagserv.Get(dm.ctx, thisc)
208 209 210 211
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
212
	// need to write past end of current dag
213
	if !done {
214
		dm.curNode, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
215 216 217 218
		if err != nil {
			return err
		}

219
		err = dm.dagserv.Add(dm.ctx, dm.curNode)
220 221 222 223 224 225 226 227 228 229 230
		if err != nil {
			return err
		}
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
231
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
232 233
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
234
func (dm *DagModifier) modifyDag(n ipld.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
235 236 237 238 239 240 241 242
	// If we've reached a leaf node.
	if len(n.Links()) == 0 {
		switch nd0 := n.(type) {
		case *mdag.ProtoNode:
			f, err := ft.FromBytes(nd0.Data())
			if err != nil {
				return nil, false, err
			}
243

244 245 246 247
			n, err := data.Read(f.Data[offset:])
			if err != nil && err != io.EOF {
				return nil, false, err
			}
248

249 250 251 252 253
			// Update newly written node..
			b, err := proto.Marshal(f)
			if err != nil {
				return nil, false, err
			}
254

255 256
			nd := new(mdag.ProtoNode)
			nd.SetData(b)
257
			nd.SetPrefix(&nd0.Prefix)
258
			err = dm.dagserv.Add(dm.ctx, nd)
259 260 261
			if err != nil {
				return nil, false, err
			}
262

263 264 265 266 267 268
			// Hey look! we're done!
			var done bool
			if n < len(f.Data[offset:]) {
				done = true
			}

269
			return nd.Cid(), done, nil
270 271 272
		case *mdag.RawNode:
			origData := nd0.RawData()
			bytes := make([]byte, len(origData))
273

274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
			// copy orig data up to offset
			copy(bytes, origData[:offset])

			// copy in new data
			n, err := data.Read(bytes[offset:])
			if err != nil && err != io.EOF {
				return nil, false, err
			}

			// copy remaining data
			offsetPlusN := int(offset) + n
			if offsetPlusN < len(origData) {
				copy(bytes[offsetPlusN:], origData[offsetPlusN:])
			}

			nd, err := mdag.NewRawNodeWPrefix(bytes, nd0.Cid().Prefix())
			if err != nil {
				return nil, false, err
			}
293
			err = dm.dagserv.Add(dm.ctx, nd)
294 295 296 297 298 299 300 301 302 303
			if err != nil {
				return nil, false, err
			}

			// Hey look! we're done!
			var done bool
			if n < len(bytes[offset:]) {
				done = true
			}

304
			return nd.Cid(), done, nil
305
		}
306
	}
307

308 309 310 311 312 313 314 315
	node, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, false, ErrNotUnixfs
	}

	f, err := ft.FromBytes(node.Data())
	if err != nil {
		return nil, false, err
316 317 318 319 320
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
321
		// We found the correct child to write into
322
		if cur+bs > offset {
323
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
324
			if err != nil {
Jeromy's avatar
Jeromy committed
325
				return nil, false, err
326
			}
327

328
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
329
			if err != nil {
Jeromy's avatar
Jeromy committed
330
				return nil, false, err
331 332
			}

333
			node.Links()[i].Cid = k
334

335
			// Recache serialized node
336
			_, err = node.EncodeProtobuf(true)
337
			if err != nil {
Jeromy's avatar
Jeromy committed
338
				return nil, false, err
339 340
			}

341
			if sdone {
342
				// No more bytes to write!
343 344 345
				done = true
				break
			}
346
			offset = cur + bs
347 348 349 350
		}
		cur += bs
	}

351 352
	err = dm.dagserv.Add(dm.ctx, node)
	return node.Cid(), done, err
353 354
}

Jeromy's avatar
Jeromy committed
355
// appendData appends the blocks from the given chan to the end of this dag
356
func (dm *DagModifier) appendData(nd ipld.Node, spl chunk.Splitter) (ipld.Node, error) {
357
	switch nd := nd.(type) {
358
	case *mdag.ProtoNode, *mdag.RawNode:
359
		dbp := &help.DagBuilderParams{
360 361
			Dagserv:   dm.dagserv,
			Maxlinks:  help.DefaultLinksPerBlock,
362
			Prefix:    &dm.Prefix,
363
			RawLeaves: dm.RawLeaves,
364 365
		}
		return trickle.TrickleAppend(dm.ctx, nd, dbp.New(spl))
366 367 368
	default:
		return nil, ErrNotUnixfs
	}
369 370
}

Jeromy's avatar
Jeromy committed
371
// Read data from this dag starting at the current offset
372
func (dm *DagModifier) Read(b []byte) (int, error) {
373
	err := dm.readPrep()
374 375 376 377
	if err != nil {
		return 0, err
	}

378 379 380 381 382 383 384 385 386 387 388
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

389
	if dm.read == nil {
390 391
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
392
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
393
			cancel()
394
			return err
395 396
		}

397
		i, err := dr.Seek(int64(dm.curWrOff), io.SeekStart)
398
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
399
			cancel()
400
			return err
401 402 403
		}

		if i != int64(dm.curWrOff) {
Jakub Sztandera's avatar
Jakub Sztandera committed
404
			cancel()
405
			return ErrSeekFail
406 407
		}

408
		dm.readCancel = cancel
409 410 411
		dm.read = dr
	}

412 413 414 415 416 417 418 419 420 421 422
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
423 424 425 426 427
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
428
func (dm *DagModifier) GetNode() (ipld.Node, error) {
429
	err := dm.Sync()
430 431 432
	if err != nil {
		return nil, err
	}
433
	return dm.curNode.Copy(), nil
434 435
}

Jeromy's avatar
Jeromy committed
436
// HasChanges returned whether or not there are unflushed changes to this dag
437 438 439 440 441
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
442
	err := dm.Sync()
443 444 445 446
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
447 448 449 450 451 452
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
453
	switch whence {
454
	case io.SeekCurrent:
Jeromy's avatar
Jeromy committed
455
		newoffset = dm.curWrOff + uint64(offset)
456
	case io.SeekStart:
Jeromy's avatar
Jeromy committed
457
		newoffset = uint64(offset)
458
	case io.SeekEnd:
459
		newoffset = uint64(fisize) - uint64(offset)
460
	default:
Jeromy's avatar
Jeromy committed
461
		return 0, ErrUnrecognizedWhence
462 463
	}

464 465
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
466 467 468 469 470 471
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

472 473 474 475 476 477 478 479 480 481 482
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
483
	err := dm.Sync()
484 485 486 487 488 489 490 491 492
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
493
	// Truncate can also be used to expand the file
494
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
495
		return dm.expandSparse(int64(size) - realSize)
496 497
	}

498
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
499 500 501 502
	if err != nil {
		return err
	}

503
	err = dm.dagserv.Add(dm.ctx, nnode)
504 505 506 507 508 509 510 511
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
512
// dagTruncate truncates the given node to 'size' and returns the modified Node
513
func dagTruncate(ctx context.Context, n ipld.Node, size uint64, ds ipld.DAGService) (ipld.Node, error) {
514 515 516 517 518 519 520 521 522 523 524 525
	if len(n.Links()) == 0 {
		switch nd := n.(type) {
		case *mdag.ProtoNode:
			// TODO: this can likely be done without marshaling and remarshaling
			pbn, err := ft.FromBytes(nd.Data())
			if err != nil {
				return nil, err
			}
			nd.SetData(ft.WrapData(pbn.Data[:size]))
			return nd, nil
		case *mdag.RawNode:
			return mdag.NewRawNodeWPrefix(nd.RawData()[:size], nd.Cid().Prefix())
526
		}
527
	}
528

529 530 531
	nd, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, ErrNotUnixfs
532 533 534 535
	}

	var cur uint64
	end := 0
536
	var modified ipld.Node
537
	ndata := new(ft.FSNode)
538
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
539
		child, err := lnk.GetNode(ctx, ds)
540 541 542 543
		if err != nil {
			return nil, err
		}

544
		childsize, err := fileSize(child)
545 546 547 548
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
549
		// found the child we want to cut
550
		if size < cur+childsize {
551
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
552 553 554 555 556 557 558 559 560 561 562 563 564 565
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

566
	err := ds.Add(ctx, modified)
567 568 569 570
	if err != nil {
		return nil, err
	}

571
	nd.SetLinks(nd.Links()[:end])
572 573 574 575 576 577 578 579 580 581
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

582
	nd.SetData(d)
583

584
	// invalidate cache and recompute serialized data
585
	_, err = nd.EncodeProtobuf(true)
586 587 588 589
	if err != nil {
		return nil, err
	}

590 591
	return nd, nil
}