dagmodifier.go 11.5 KB
Newer Older
1 2 3 4
package mod

import (
	"bytes"
5
	"context"
6
	"errors"
7
	"fmt"
8 9
	"io"

10 11 12 13 14 15
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
16

17
	cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
18
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
19
	node "gx/ipfs/Qmb3Hm9QDFmfYuET4pu7Kyg8JV78jFa1nvZx5vnCZsK4ck/go-ipld-format"
20 21
)

Jeromy's avatar
Jeromy committed
22 23 24
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

25 26 27 28 29 30 31 32
// 2MB
var writebufferSize = 1 << 21

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
33
	curNode node.Node
34

35
	splitter   chunk.SplitterGen
36 37 38 39 40 41 42
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

43
	read uio.DagReader
44 45
}

46 47
var ErrNotUnixfs = fmt.Errorf("dagmodifier only supports unixfs nodes (proto or raw)")

48
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
49 50 51 52 53
	switch from.(type) {
	case *mdag.ProtoNode, *mdag.RawNode:
		// ok
	default:
		return nil, ErrNotUnixfs
54 55
	}

56
	return &DagModifier{
57
		curNode:  from.Copy(),
58 59 60 61 62 63 64 65 66
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
67 68
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
69 70 71 72 73 74
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
75 76 77 78 79 80 81 82 83 84 85
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

86
		err = dm.Sync()
87 88 89 90 91 92 93 94 95 96 97 98 99
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
100
	for i := range b {
101 102 103 104 105
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
106 107
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
108 109
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
110
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
111
	nnode, err := dm.appendData(dm.curNode, spl)
112 113 114 115 116 117 118
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
119 120 121 122 123 124 125

	pbnnode, ok := nnode.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnnode
126 127 128
	return nil
}

Jeromy's avatar
Jeromy committed
129
// Write continues writing to the dag at the current offset
130 131 132 133 134 135 136
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
137

138 139 140 141 142 143
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
144
		err := dm.Sync()
145 146 147 148 149 150 151
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

152 153
var ErrNoRawYet = fmt.Errorf("currently only fully support protonodes in the dagmodifier")

154
func (dm *DagModifier) Size() (int64, error) {
155 156 157 158 159 160 161
	switch nd := dm.curNode.(type) {
	case *mdag.ProtoNode:
		pbn, err := ft.FromBytes(nd.Data())
		if err != nil {
			return 0, err
		}
		if dm.wrBuf != nil && uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
162 163
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
164 165 166 167 168 169 170 171 172
		return int64(pbn.GetFilesize()), nil
	case *mdag.RawNode:
		if dm.wrBuf != nil {
			return 0, ErrNoRawYet
		}
		sz, err := nd.Size()
		return int64(sz), err
	default:
		return 0, ErrNotUnixfs
173 174 175
	}
}

176 177
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
178
	// No buffer? Nothing to do
179 180 181 182 183 184 185 186 187 188
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
189
	// Number of bytes we're going to write
190 191
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
192
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
193
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
194 195 196 197
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
198
	nd, err := dm.dagserv.Get(dm.ctx, thisc)
199 200 201 202
	if err != nil {
		return err
	}

203 204 205 206 207 208
	pbnd, ok := nd.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnd
209

Jeromy's avatar
Jeromy committed
210
	// need to write past end of current dag
211
	if !done {
212
		nd, err := dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
213 214 215 216
		if err != nil {
			return err
		}

Jeromy's avatar
Jeromy committed
217
		_, err = dm.dagserv.Add(nd)
218 219 220 221
		if err != nil {
			return err
		}

222 223 224 225 226 227
		pbnode, ok := nd.(*mdag.ProtoNode)
		if !ok {
			return mdag.ErrNotProtobuf
		}

		dm.curNode = pbnode
228 229 230 231 232 233 234 235
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
236
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
237 238
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
239 240 241 242 243 244
func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
	node, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, false, ErrNoRawYet
	}

245
	f, err := ft.FromBytes(node.Data())
246
	if err != nil {
Jeromy's avatar
Jeromy committed
247
		return nil, false, err
248 249
	}

Jeromy's avatar
Jeromy committed
250
	// If we've reached a leaf node.
251
	if len(node.Links()) == 0 {
252 253
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
254
			return nil, false, err
255 256 257 258 259
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
260
			return nil, false, err
261 262
		}

263
		nd := new(mdag.ProtoNode)
264
		nd.SetData(b)
265 266
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
267
			return nil, false, err
268 269 270 271
		}

		// Hey look! we're done!
		var done bool
272
		if n < len(f.Data[offset:]) {
273 274 275
			done = true
		}

Jeromy's avatar
Jeromy committed
276
		return k, done, nil
277 278 279 280 281
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
282
		// We found the correct child to write into
283
		if cur+bs > offset {
284
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
285
			if err != nil {
Jeromy's avatar
Jeromy committed
286
				return nil, false, err
287
			}
288 289 290 291 292 293 294

			childpb, ok := child.(*mdag.ProtoNode)
			if !ok {
				return nil, false, mdag.ErrNotProtobuf
			}

			k, sdone, err := dm.modifyDag(childpb, offset-cur, data)
295
			if err != nil {
Jeromy's avatar
Jeromy committed
296
				return nil, false, err
297 298 299
			}

			offset += bs
300
			node.Links()[i].Cid = k
301

302
			// Recache serialized node
303
			_, err = node.EncodeProtobuf(true)
304
			if err != nil {
Jeromy's avatar
Jeromy committed
305
				return nil, false, err
306 307
			}

308
			if sdone {
309
				// No more bytes to write!
310 311 312
				done = true
				break
			}
313
			offset = cur + bs
314 315 316 317 318
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
319
	return k, done, err
320 321
}

Jeromy's avatar
Jeromy committed
322
// appendData appends the blocks from the given chan to the end of this dag
323 324 325 326 327 328 329 330 331 332 333 334 335 336
func (dm *DagModifier) appendData(nd node.Node, spl chunk.Splitter) (node.Node, error) {

	var root *mdag.ProtoNode
	switch nd := nd.(type) {
	case *mdag.ProtoNode:
		root = nd
	case *mdag.RawNode:
		// TODO: be able to append to rawnodes. Probably requires making this
		// node a child of a unxifs intermediate node and passing it down
		return nil, fmt.Errorf("appending to raw node types not yet supported")
	default:
		return nil, ErrNotUnixfs
	}

337 338 339 340 341
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

342
	return trickle.TrickleAppend(dm.ctx, root, dbp.New(spl))
343 344
}

Jeromy's avatar
Jeromy committed
345
// Read data from this dag starting at the current offset
346
func (dm *DagModifier) Read(b []byte) (int, error) {
347
	err := dm.readPrep()
348 349 350 351
	if err != nil {
		return 0, err
	}

352 353 354 355 356 357 358 359 360 361 362
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

363
	if dm.read == nil {
364 365
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
366
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
367
			cancel()
368
			return err
369 370
		}

371
		i, err := dr.Seek(int64(dm.curWrOff), io.SeekStart)
372
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
373
			cancel()
374
			return err
375 376 377
		}

		if i != int64(dm.curWrOff) {
Jakub Sztandera's avatar
Jakub Sztandera committed
378
			cancel()
379
			return ErrSeekFail
380 381
		}

382
		dm.readCancel = cancel
383 384 385
		dm.read = dr
	}

386 387 388 389 390 391 392 393 394 395 396
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
397 398 399 400 401
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
402
func (dm *DagModifier) GetNode() (node.Node, error) {
403
	err := dm.Sync()
404 405 406
	if err != nil {
		return nil, err
	}
407
	return dm.curNode.Copy(), nil
408 409
}

Jeromy's avatar
Jeromy committed
410
// HasChanges returned whether or not there are unflushed changes to this dag
411 412 413 414 415
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
416
	err := dm.Sync()
417 418 419 420
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
421 422 423 424 425 426
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
427
	switch whence {
428
	case io.SeekCurrent:
Jeromy's avatar
Jeromy committed
429
		newoffset = dm.curWrOff + uint64(offset)
430
	case io.SeekStart:
Jeromy's avatar
Jeromy committed
431
		newoffset = uint64(offset)
432
	case io.SeekEnd:
433
		newoffset = uint64(fisize) - uint64(offset)
434
	default:
Jeromy's avatar
Jeromy committed
435
		return 0, ErrUnrecognizedWhence
436 437
	}

438 439
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
440 441 442 443 444 445
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

446 447 448 449 450 451 452 453 454 455 456
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
457
	err := dm.Sync()
458 459 460 461 462 463 464 465 466
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
467
	// Truncate can also be used to expand the file
468
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
469
		return dm.expandSparse(int64(size) - realSize)
470 471
	}

472
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
473 474 475 476 477 478 479 480 481 482 483 484 485
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
486
// dagTruncate truncates the given node to 'size' and returns the modified Node
487 488 489 490 491 492
func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGService) (*mdag.ProtoNode, error) {
	nd, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, ErrNoRawYet
	}

493
	if len(nd.Links()) == 0 {
494
		// TODO: this can likely be done without marshaling and remarshaling
495
		pbn, err := ft.FromBytes(nd.Data())
496 497 498 499
		if err != nil {
			return nil, err
		}

500
		nd.SetData(ft.WrapData(pbn.Data[:size]))
501 502 503 504 505
		return nd, nil
	}

	var cur uint64
	end := 0
506
	var modified *mdag.ProtoNode
507
	ndata := new(ft.FSNode)
508
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
509
		child, err := lnk.GetNode(ctx, ds)
510 511 512 513
		if err != nil {
			return nil, err
		}

514 515 516 517 518 519
		childpb, ok := child.(*mdag.ProtoNode)
		if !ok {
			return nil, err
		}

		childsize, err := ft.DataSize(childpb.Data())
520 521 522 523
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
524
		// found the child we want to cut
525
		if size < cur+childsize {
526
			nchild, err := dagTruncate(ctx, childpb, size-cur, ds)
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

546
	nd.SetLinks(nd.Links()[:end])
547 548 549 550 551 552 553 554 555 556
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

557
	nd.SetData(d)
558

559
	// invalidate cache and recompute serialized data
560
	_, err = nd.EncodeProtobuf(true)
561 562 563 564
	if err != nil {
		return nil, err
	}

565 566
	return nd, nil
}