dagmodifier.go 12.3 KB
Newer Older
1 2 3 4
package mod

import (
	"bytes"
5
	"context"
6
	"errors"
7
	"fmt"
8 9
	"io"

10 11 12 13 14 15
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
16 17

	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
Steven Allen's avatar
Steven Allen committed
18 19
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
	node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
20 21
)

Jeromy's avatar
Jeromy committed
22 23 24
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

25 26 27 28 29 30 31 32
// 2MB
var writebufferSize = 1 << 21

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
33
	curNode node.Node
34

35
	splitter   chunk.SplitterGen
36 37 38 39 40 41 42
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

43
	Prefix    cid.Prefix
44 45
	RawLeaves bool

46
	read uio.DagReader
47 48
}

49 50
var ErrNotUnixfs = fmt.Errorf("dagmodifier only supports unixfs nodes (proto or raw)")

51 52 53 54
// NewDagModifier returns a new DagModifier, the Cid prefix for newly
// created nodes will be inherted from the passed in node.  If the Cid
// version if not 0 raw leaves will also be enabled.  The Prefix and
// RawLeaves options can be overridden by changing them after the call.
55
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
56 57 58 59 60
	switch from.(type) {
	case *mdag.ProtoNode, *mdag.RawNode:
		// ok
	default:
		return nil, ErrNotUnixfs
61 62
	}

63 64 65 66 67 68 69
	prefix := from.Cid().Prefix()
	prefix.Codec = cid.DagProtobuf
	rawLeaves := false
	if prefix.Version > 0 {
		rawLeaves = true
	}

70
	return &DagModifier{
71 72 73 74 75 76
		curNode:   from.Copy(),
		dagserv:   serv,
		splitter:  spl,
		ctx:       ctx,
		Prefix:    prefix,
		RawLeaves: rawLeaves,
77 78 79 80 81 82
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
83 84
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
85 86 87 88 89 90
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
91 92 93 94 95 96 97 98 99 100 101
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

102
		err = dm.Sync()
103 104 105 106 107 108 109 110 111 112 113 114 115
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
116
	for i := range b {
117 118 119 120 121
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
122 123
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
124 125
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
126
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
127
	nnode, err := dm.appendData(dm.curNode, spl)
128 129 130 131
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
132
	return err
133 134
}

Jeromy's avatar
Jeromy committed
135
// Write continues writing to the dag at the current offset
136 137 138 139 140 141 142
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
143

144 145 146 147 148 149
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
150
		err := dm.Sync()
151 152 153 154 155 156 157
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

158
// Size returns the Filesize of the node
159
func (dm *DagModifier) Size() (int64, error) {
160 161 162 163 164 165 166 167 168 169 170 171
	fileSize, err := fileSize(dm.curNode)
	if err != nil {
		return 0, err
	}
	if dm.wrBuf != nil && int64(dm.wrBuf.Len())+int64(dm.writeStart) > int64(fileSize) {
		return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
	}
	return int64(fileSize), nil
}

func fileSize(n node.Node) (uint64, error) {
	switch nd := n.(type) {
172
	case *mdag.ProtoNode:
173
		f, err := ft.FromBytes(nd.Data())
174 175 176
		if err != nil {
			return 0, err
		}
177
		return f.GetFilesize(), nil
178
	case *mdag.RawNode:
179
		return uint64(len(nd.RawData())), nil
180 181
	default:
		return 0, ErrNotUnixfs
182 183 184
	}
}

185 186
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
187
	// No buffer? Nothing to do
188 189 190 191 192 193 194 195 196 197
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
198
	// Number of bytes we're going to write
199 200
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
201
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
202
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
203 204 205 206
	if err != nil {
		return err
	}

207
	dm.curNode, err = dm.dagserv.Get(dm.ctx, thisc)
208 209 210 211
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
212
	// need to write past end of current dag
213
	if !done {
214
		dm.curNode, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
215 216 217 218
		if err != nil {
			return err
		}

219
		_, err = dm.dagserv.Add(dm.curNode)
220 221 222 223 224 225 226 227 228 229 230
		if err != nil {
			return err
		}
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
231
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
232 233
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
234
func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
235 236 237 238 239 240 241 242
	// If we've reached a leaf node.
	if len(n.Links()) == 0 {
		switch nd0 := n.(type) {
		case *mdag.ProtoNode:
			f, err := ft.FromBytes(nd0.Data())
			if err != nil {
				return nil, false, err
			}
243

244 245 246 247
			n, err := data.Read(f.Data[offset:])
			if err != nil && err != io.EOF {
				return nil, false, err
			}
248

249 250 251 252 253
			// Update newly written node..
			b, err := proto.Marshal(f)
			if err != nil {
				return nil, false, err
			}
254

255 256
			nd := new(mdag.ProtoNode)
			nd.SetData(b)
257
			nd.SetPrefix(&nd0.Prefix)
258 259 260 261
			k, err := dm.dagserv.Add(nd)
			if err != nil {
				return nil, false, err
			}
262

263 264 265 266 267 268 269 270 271 272
			// Hey look! we're done!
			var done bool
			if n < len(f.Data[offset:]) {
				done = true
			}

			return k, done, nil
		case *mdag.RawNode:
			origData := nd0.RawData()
			bytes := make([]byte, len(origData))
273

274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
			// copy orig data up to offset
			copy(bytes, origData[:offset])

			// copy in new data
			n, err := data.Read(bytes[offset:])
			if err != nil && err != io.EOF {
				return nil, false, err
			}

			// copy remaining data
			offsetPlusN := int(offset) + n
			if offsetPlusN < len(origData) {
				copy(bytes[offsetPlusN:], origData[offsetPlusN:])
			}

			nd, err := mdag.NewRawNodeWPrefix(bytes, nd0.Cid().Prefix())
			if err != nil {
				return nil, false, err
			}
			k, err := dm.dagserv.Add(nd)
			if err != nil {
				return nil, false, err
			}

			// Hey look! we're done!
			var done bool
			if n < len(bytes[offset:]) {
				done = true
			}

			return k, done, nil
305
		}
306
	}
307

308 309 310 311 312 313 314 315
	node, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, false, ErrNotUnixfs
	}

	f, err := ft.FromBytes(node.Data())
	if err != nil {
		return nil, false, err
316 317 318 319 320
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
321
		// We found the correct child to write into
322
		if cur+bs > offset {
323
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
324
			if err != nil {
Jeromy's avatar
Jeromy committed
325
				return nil, false, err
326
			}
327

328
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
329
			if err != nil {
Jeromy's avatar
Jeromy committed
330
				return nil, false, err
331 332
			}

333
			node.Links()[i].Cid = k
334

335
			// Recache serialized node
336
			_, err = node.EncodeProtobuf(true)
337
			if err != nil {
Jeromy's avatar
Jeromy committed
338
				return nil, false, err
339 340
			}

341
			if sdone {
342
				// No more bytes to write!
343 344 345
				done = true
				break
			}
346
			offset = cur + bs
347 348 349 350 351
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
352
	return k, done, err
353 354
}

Jeromy's avatar
Jeromy committed
355
// appendData appends the blocks from the given chan to the end of this dag
356 357
func (dm *DagModifier) appendData(nd node.Node, spl chunk.Splitter) (node.Node, error) {
	switch nd := nd.(type) {
358
	case *mdag.ProtoNode, *mdag.RawNode:
359
		dbp := &help.DagBuilderParams{
360 361
			Dagserv:   dm.dagserv,
			Maxlinks:  help.DefaultLinksPerBlock,
362
			Prefix:    &dm.Prefix,
363
			RawLeaves: dm.RawLeaves,
364 365
		}
		return trickle.TrickleAppend(dm.ctx, nd, dbp.New(spl))
366 367 368
	default:
		return nil, ErrNotUnixfs
	}
369 370
}

Jeromy's avatar
Jeromy committed
371
// Read data from this dag starting at the current offset
372
func (dm *DagModifier) Read(b []byte) (int, error) {
373
	err := dm.readPrep()
374 375 376 377
	if err != nil {
		return 0, err
	}

378 379 380 381 382 383 384 385 386 387 388
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

389
	if dm.read == nil {
390 391
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
392
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
393
			cancel()
394
			return err
395 396
		}

397
		i, err := dr.Seek(int64(dm.curWrOff), io.SeekStart)
398
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
399
			cancel()
400
			return err
401 402 403
		}

		if i != int64(dm.curWrOff) {
Jakub Sztandera's avatar
Jakub Sztandera committed
404
			cancel()
405
			return ErrSeekFail
406 407
		}

408
		dm.readCancel = cancel
409 410 411
		dm.read = dr
	}

412 413 414 415 416 417 418 419 420 421 422
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
423 424 425 426 427
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
428
func (dm *DagModifier) GetNode() (node.Node, error) {
429
	err := dm.Sync()
430 431 432
	if err != nil {
		return nil, err
	}
433
	return dm.curNode.Copy(), nil
434 435
}

Jeromy's avatar
Jeromy committed
436
// HasChanges returned whether or not there are unflushed changes to this dag
437 438 439 440 441
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
442
	err := dm.Sync()
443 444 445 446
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
447 448 449 450 451 452
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
453
	switch whence {
454
	case io.SeekCurrent:
Jeromy's avatar
Jeromy committed
455
		newoffset = dm.curWrOff + uint64(offset)
456
	case io.SeekStart:
Jeromy's avatar
Jeromy committed
457
		newoffset = uint64(offset)
458
	case io.SeekEnd:
459
		newoffset = uint64(fisize) - uint64(offset)
460
	default:
Jeromy's avatar
Jeromy committed
461
		return 0, ErrUnrecognizedWhence
462 463
	}

464 465
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
466 467 468 469 470 471
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

472 473 474 475 476 477 478 479 480 481 482
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
483
	err := dm.Sync()
484 485 486 487 488 489 490 491 492
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
493
	// Truncate can also be used to expand the file
494
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
495
		return dm.expandSparse(int64(size) - realSize)
496 497
	}

498
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
499 500 501 502 503 504 505 506 507 508 509 510 511
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
512
// dagTruncate truncates the given node to 'size' and returns the modified Node
513 514 515 516 517 518 519 520 521 522 523 524 525
func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGService) (node.Node, error) {
	if len(n.Links()) == 0 {
		switch nd := n.(type) {
		case *mdag.ProtoNode:
			// TODO: this can likely be done without marshaling and remarshaling
			pbn, err := ft.FromBytes(nd.Data())
			if err != nil {
				return nil, err
			}
			nd.SetData(ft.WrapData(pbn.Data[:size]))
			return nd, nil
		case *mdag.RawNode:
			return mdag.NewRawNodeWPrefix(nd.RawData()[:size], nd.Cid().Prefix())
526
		}
527
	}
528

529 530 531
	nd, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, ErrNotUnixfs
532 533 534 535
	}

	var cur uint64
	end := 0
536
	var modified node.Node
537
	ndata := new(ft.FSNode)
538
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
539
		child, err := lnk.GetNode(ctx, ds)
540 541 542 543
		if err != nil {
			return nil, err
		}

544
		childsize, err := fileSize(child)
545 546 547 548
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
549
		// found the child we want to cut
550
		if size < cur+childsize {
551
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

571
	nd.SetLinks(nd.Links()[:end])
572 573 574 575 576 577 578 579 580 581
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

582
	nd.SetData(d)
583

584
	// invalidate cache and recompute serialized data
585
	_, err = nd.EncodeProtobuf(true)
586 587 588 589
	if err != nil {
		return nil, err
	}

590 591
	return nd, nil
}