dagmodifier.go 12.4 KB
Newer Older
1 2 3 4
package mod

import (
	"bytes"
5
	"context"
6
	"errors"
7
	"fmt"
8 9
	"io"

10 11 12 13 14 15
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
16

Steven Allen's avatar
Steven Allen committed
17
	node "gx/ipfs/QmNwUEK7QbwSqyKBu3mMtToo8SUc6wQJ7gdZq4gGGJqfnf/go-ipld-format"
18
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
Steven Allen's avatar
Steven Allen committed
19
	cid "gx/ipfs/QmeSrf6pzut73u6zLQkRFQ3ygt3k6XFT2kjdYP8Tnkwwyg/go-cid"
20 21
)

Jeromy's avatar
Jeromy committed
22 23 24
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

25 26 27 28 29 30 31 32
// 2MB
var writebufferSize = 1 << 21

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
33
	curNode node.Node
34

35
	splitter   chunk.SplitterGen
36 37 38 39 40 41 42
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

43
	Prefix    cid.Prefix
44 45
	RawLeaves bool

46
	read uio.DagReader
47 48
}

49 50
var ErrNotUnixfs = fmt.Errorf("dagmodifier only supports unixfs nodes (proto or raw)")

51 52 53 54
// NewDagModifier returns a new DagModifier, the Cid prefix for newly
// created nodes will be inherted from the passed in node.  If the Cid
// version if not 0 raw leaves will also be enabled.  The Prefix and
// RawLeaves options can be overridden by changing them after the call.
55
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
56 57 58 59 60
	switch from.(type) {
	case *mdag.ProtoNode, *mdag.RawNode:
		// ok
	default:
		return nil, ErrNotUnixfs
61 62
	}

63 64 65 66 67 68 69
	prefix := from.Cid().Prefix()
	prefix.Codec = cid.DagProtobuf
	rawLeaves := false
	if prefix.Version > 0 {
		rawLeaves = true
	}

70
	return &DagModifier{
71 72 73 74 75 76
		curNode:   from.Copy(),
		dagserv:   serv,
		splitter:  spl,
		ctx:       ctx,
		Prefix:    prefix,
		RawLeaves: rawLeaves,
77 78 79 80 81 82
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
83 84
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
85 86 87 88 89 90
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
91 92 93 94 95 96 97 98 99 100 101
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

102
		err = dm.Sync()
103 104 105 106 107 108 109 110 111 112 113 114 115
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
116
	for i := range b {
117 118 119 120 121
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
122 123
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
124 125
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
126
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
127
	nnode, err := dm.appendData(dm.curNode, spl)
128 129 130 131
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
132
	return err
133 134
}

Jeromy's avatar
Jeromy committed
135
// Write continues writing to the dag at the current offset
136 137 138 139 140 141 142
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
143

144 145 146 147 148 149
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
150
		err := dm.Sync()
151 152 153 154 155 156 157
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

158
// Size returns the Filesize of the node
159
func (dm *DagModifier) Size() (int64, error) {
160 161 162 163 164 165 166 167 168 169 170 171
	fileSize, err := fileSize(dm.curNode)
	if err != nil {
		return 0, err
	}
	if dm.wrBuf != nil && int64(dm.wrBuf.Len())+int64(dm.writeStart) > int64(fileSize) {
		return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
	}
	return int64(fileSize), nil
}

func fileSize(n node.Node) (uint64, error) {
	switch nd := n.(type) {
172
	case *mdag.ProtoNode:
173
		f, err := ft.FromBytes(nd.Data())
174 175 176
		if err != nil {
			return 0, err
		}
177
		return f.GetFilesize(), nil
178
	case *mdag.RawNode:
179
		return uint64(len(nd.RawData())), nil
180 181
	default:
		return 0, ErrNotUnixfs
182 183 184
	}
}

185 186
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
187
	// No buffer? Nothing to do
188 189 190 191 192 193 194 195 196 197
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
198
	// Number of bytes we're going to write
199 200
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
201
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
202
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
203 204 205 206
	if err != nil {
		return err
	}

207
	dm.curNode, err = dm.dagserv.Get(dm.ctx, thisc)
208 209 210 211
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
212
	// need to write past end of current dag
213
	if !done {
214
		dm.curNode, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
215 216 217 218
		if err != nil {
			return err
		}

219
		_, err = dm.dagserv.Add(dm.curNode)
220 221 222 223 224 225 226 227 228 229 230
		if err != nil {
			return err
		}
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
231
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
232 233
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
234
func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
235 236 237 238 239 240 241 242
	// If we've reached a leaf node.
	if len(n.Links()) == 0 {
		switch nd0 := n.(type) {
		case *mdag.ProtoNode:
			f, err := ft.FromBytes(nd0.Data())
			if err != nil {
				return nil, false, err
			}
243

244 245 246 247
			n, err := data.Read(f.Data[offset:])
			if err != nil && err != io.EOF {
				return nil, false, err
			}
248

249 250 251 252 253
			// Update newly written node..
			b, err := proto.Marshal(f)
			if err != nil {
				return nil, false, err
			}
254

255 256
			nd := new(mdag.ProtoNode)
			nd.SetData(b)
257
			nd.SetPrefix(&nd0.Prefix)
258 259 260 261
			k, err := dm.dagserv.Add(nd)
			if err != nil {
				return nil, false, err
			}
262

263 264 265 266 267 268 269 270 271 272
			// Hey look! we're done!
			var done bool
			if n < len(f.Data[offset:]) {
				done = true
			}

			return k, done, nil
		case *mdag.RawNode:
			origData := nd0.RawData()
			bytes := make([]byte, len(origData))
273

274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
			// copy orig data up to offset
			copy(bytes, origData[:offset])

			// copy in new data
			n, err := data.Read(bytes[offset:])
			if err != nil && err != io.EOF {
				return nil, false, err
			}

			// copy remaining data
			offsetPlusN := int(offset) + n
			if offsetPlusN < len(origData) {
				copy(bytes[offsetPlusN:], origData[offsetPlusN:])
			}

			nd, err := mdag.NewRawNodeWPrefix(bytes, nd0.Cid().Prefix())
			if err != nil {
				return nil, false, err
			}
			k, err := dm.dagserv.Add(nd)
			if err != nil {
				return nil, false, err
			}

			// Hey look! we're done!
			var done bool
			if n < len(bytes[offset:]) {
				done = true
			}

			return k, done, nil
305
		}
306
	}
307

308 309 310 311 312 313 314 315
	node, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, false, ErrNotUnixfs
	}

	f, err := ft.FromBytes(node.Data())
	if err != nil {
		return nil, false, err
316 317 318 319 320
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
321
		// We found the correct child to write into
322
		if cur+bs > offset {
323
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
324
			if err != nil {
Jeromy's avatar
Jeromy committed
325
				return nil, false, err
326
			}
327

328
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
329
			if err != nil {
Jeromy's avatar
Jeromy committed
330
				return nil, false, err
331 332 333
			}

			offset += bs
334
			node.Links()[i].Cid = k
335

336
			// Recache serialized node
337
			_, err = node.EncodeProtobuf(true)
338
			if err != nil {
Jeromy's avatar
Jeromy committed
339
				return nil, false, err
340 341
			}

342
			if sdone {
343
				// No more bytes to write!
344 345 346
				done = true
				break
			}
347
			offset = cur + bs
348 349 350 351 352
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
353
	return k, done, err
354 355
}

Jeromy's avatar
Jeromy committed
356
// appendData appends the blocks from the given chan to the end of this dag
357 358
func (dm *DagModifier) appendData(nd node.Node, spl chunk.Splitter) (node.Node, error) {
	switch nd := nd.(type) {
359
	case *mdag.ProtoNode, *mdag.RawNode:
360
		dbp := &help.DagBuilderParams{
361 362
			Dagserv:   dm.dagserv,
			Maxlinks:  help.DefaultLinksPerBlock,
363
			Prefix:    &dm.Prefix,
364
			RawLeaves: dm.RawLeaves,
365 366
		}
		return trickle.TrickleAppend(dm.ctx, nd, dbp.New(spl))
367 368 369
	default:
		return nil, ErrNotUnixfs
	}
370 371
}

Jeromy's avatar
Jeromy committed
372
// Read data from this dag starting at the current offset
373
func (dm *DagModifier) Read(b []byte) (int, error) {
374
	err := dm.readPrep()
375 376 377 378
	if err != nil {
		return 0, err
	}

379 380 381 382 383 384 385 386 387 388 389
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

390
	if dm.read == nil {
391 392
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
393
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
394
			cancel()
395
			return err
396 397
		}

398
		i, err := dr.Seek(int64(dm.curWrOff), io.SeekStart)
399
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
400
			cancel()
401
			return err
402 403 404
		}

		if i != int64(dm.curWrOff) {
Jakub Sztandera's avatar
Jakub Sztandera committed
405
			cancel()
406
			return ErrSeekFail
407 408
		}

409
		dm.readCancel = cancel
410 411 412
		dm.read = dr
	}

413 414 415 416 417 418 419 420 421 422 423
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
424 425 426 427 428
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
429
func (dm *DagModifier) GetNode() (node.Node, error) {
430
	err := dm.Sync()
431 432 433
	if err != nil {
		return nil, err
	}
434
	return dm.curNode.Copy(), nil
435 436
}

Jeromy's avatar
Jeromy committed
437
// HasChanges returned whether or not there are unflushed changes to this dag
438 439 440 441 442
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
443
	err := dm.Sync()
444 445 446 447
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
448 449 450 451 452 453
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
454
	switch whence {
455
	case io.SeekCurrent:
Jeromy's avatar
Jeromy committed
456
		newoffset = dm.curWrOff + uint64(offset)
457
	case io.SeekStart:
Jeromy's avatar
Jeromy committed
458
		newoffset = uint64(offset)
459
	case io.SeekEnd:
460
		newoffset = uint64(fisize) - uint64(offset)
461
	default:
Jeromy's avatar
Jeromy committed
462
		return 0, ErrUnrecognizedWhence
463 464
	}

465 466
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
467 468 469 470 471 472
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

473 474 475 476 477 478 479 480 481 482 483
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
484
	err := dm.Sync()
485 486 487 488 489 490 491 492 493
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
494
	// Truncate can also be used to expand the file
495
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
496
		return dm.expandSparse(int64(size) - realSize)
497 498
	}

499
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
500 501 502 503 504 505 506 507 508 509 510 511 512
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
513
// dagTruncate truncates the given node to 'size' and returns the modified Node
514 515 516 517 518 519 520 521 522 523 524 525 526
func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGService) (node.Node, error) {
	if len(n.Links()) == 0 {
		switch nd := n.(type) {
		case *mdag.ProtoNode:
			// TODO: this can likely be done without marshaling and remarshaling
			pbn, err := ft.FromBytes(nd.Data())
			if err != nil {
				return nil, err
			}
			nd.SetData(ft.WrapData(pbn.Data[:size]))
			return nd, nil
		case *mdag.RawNode:
			return mdag.NewRawNodeWPrefix(nd.RawData()[:size], nd.Cid().Prefix())
527
		}
528
	}
529

530 531 532
	nd, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, ErrNotUnixfs
533 534 535 536
	}

	var cur uint64
	end := 0
537
	var modified node.Node
538
	ndata := new(ft.FSNode)
539
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
540
		child, err := lnk.GetNode(ctx, ds)
541 542 543 544
		if err != nil {
			return nil, err
		}

545
		childsize, err := fileSize(child)
546 547 548 549
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
550
		// found the child we want to cut
551
		if size < cur+childsize {
552
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

572
	nd.SetLinks(nd.Links()[:end])
573 574 575 576 577 578 579 580 581 582
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

583
	nd.SetData(d)
584

585
	// invalidate cache and recompute serialized data
586
	_, err = nd.EncodeProtobuf(true)
587 588 589 590
	if err != nil {
		return nil, err
	}

591 592
	return nd, nil
}