dagmodifier.go 11.3 KB
Newer Older
1 2 3 4
package mod

import (
	"bytes"
5
	"context"
6
	"errors"
7
	"fmt"
8 9
	"io"

10 11 12 13 14 15
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
16

17 18
	cid "gx/ipfs/QmNw61A6sJoXMeP37mJRtQZdNhj5e3FdjoTN3v4FyE96Gk/go-cid"
	node "gx/ipfs/QmUBtPvHKFAX43XMsyxsYpMi3U5VwZ4jYFTo4kFhvAR33G/go-ipld-format"
19
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
20 21
)

Jeromy's avatar
Jeromy committed
22 23 24
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

25 26 27 28 29 30 31 32
// 2MB
var writebufferSize = 1 << 21

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
33
	curNode node.Node
34

35
	splitter   chunk.SplitterGen
36 37 38 39 40 41 42
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

43
	read uio.DagReader
44 45
}

46 47
var ErrNotUnixfs = fmt.Errorf("dagmodifier only supports unixfs nodes (proto or raw)")

48
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
49 50 51 52 53
	switch from.(type) {
	case *mdag.ProtoNode, *mdag.RawNode:
		// ok
	default:
		return nil, ErrNotUnixfs
54 55
	}

56
	return &DagModifier{
57
		curNode:  from.Copy(),
58 59 60 61 62 63 64 65 66
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
67 68
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
69 70 71 72 73 74
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
75 76 77 78 79 80 81 82 83 84 85
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

86
		err = dm.Sync()
87 88 89 90 91 92 93 94 95 96 97 98 99
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
100
	for i := range b {
101 102 103 104 105
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
106 107
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
108 109
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
110
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
111
	nnode, err := dm.appendData(dm.curNode, spl)
112 113 114 115 116 117 118
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
119 120 121 122 123 124 125

	pbnnode, ok := nnode.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnnode
126 127 128
	return nil
}

Jeromy's avatar
Jeromy committed
129
// Write continues writing to the dag at the current offset
130 131 132 133 134 135 136
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
137

138 139 140 141 142 143
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
144
		err := dm.Sync()
145 146 147 148 149 150 151
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

152 153
var ErrNoRawYet = fmt.Errorf("currently only fully support protonodes in the dagmodifier")

154
// Size returns the Filesize of the node
155
func (dm *DagModifier) Size() (int64, error) {
156 157 158 159 160 161 162
	switch nd := dm.curNode.(type) {
	case *mdag.ProtoNode:
		pbn, err := ft.FromBytes(nd.Data())
		if err != nil {
			return 0, err
		}
		if dm.wrBuf != nil && uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
163 164
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
165 166 167 168 169 170 171 172 173
		return int64(pbn.GetFilesize()), nil
	case *mdag.RawNode:
		if dm.wrBuf != nil {
			return 0, ErrNoRawYet
		}
		sz, err := nd.Size()
		return int64(sz), err
	default:
		return 0, ErrNotUnixfs
174 175 176
	}
}

177 178
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
179
	// No buffer? Nothing to do
180 181 182 183 184 185 186 187 188 189
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
190
	// Number of bytes we're going to write
191 192
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
193
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
194
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
195 196 197 198
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
199
	nd, err := dm.dagserv.Get(dm.ctx, thisc)
200 201 202 203
	if err != nil {
		return err
	}

204 205 206 207 208 209
	pbnd, ok := nd.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnd
210

Jeromy's avatar
Jeromy committed
211
	// need to write past end of current dag
212
	if !done {
213
		nd, err := dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
214 215 216 217
		if err != nil {
			return err
		}

Jeromy's avatar
Jeromy committed
218
		_, err = dm.dagserv.Add(nd)
219 220 221 222
		if err != nil {
			return err
		}

223 224 225 226 227 228
		pbnode, ok := nd.(*mdag.ProtoNode)
		if !ok {
			return mdag.ErrNotProtobuf
		}

		dm.curNode = pbnode
229 230 231 232 233 234 235 236
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
237
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
238 239
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
240 241 242 243 244 245
func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
	node, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, false, ErrNoRawYet
	}

246
	f, err := ft.FromBytes(node.Data())
247
	if err != nil {
Jeromy's avatar
Jeromy committed
248
		return nil, false, err
249 250
	}

Jeromy's avatar
Jeromy committed
251
	// If we've reached a leaf node.
252
	if len(node.Links()) == 0 {
253 254
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
255
			return nil, false, err
256 257 258 259 260
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
261
			return nil, false, err
262 263
		}

264
		nd := new(mdag.ProtoNode)
265
		nd.SetData(b)
266 267
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
268
			return nil, false, err
269 270 271 272
		}

		// Hey look! we're done!
		var done bool
273
		if n < len(f.Data[offset:]) {
274 275 276
			done = true
		}

Jeromy's avatar
Jeromy committed
277
		return k, done, nil
278 279 280 281 282
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
283
		// We found the correct child to write into
284
		if cur+bs > offset {
285
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
286
			if err != nil {
Jeromy's avatar
Jeromy committed
287
				return nil, false, err
288
			}
289 290 291 292 293 294 295

			childpb, ok := child.(*mdag.ProtoNode)
			if !ok {
				return nil, false, mdag.ErrNotProtobuf
			}

			k, sdone, err := dm.modifyDag(childpb, offset-cur, data)
296
			if err != nil {
Jeromy's avatar
Jeromy committed
297
				return nil, false, err
298 299 300
			}

			offset += bs
301
			node.Links()[i].Cid = k
302

303
			// Recache serialized node
304
			_, err = node.EncodeProtobuf(true)
305
			if err != nil {
Jeromy's avatar
Jeromy committed
306
				return nil, false, err
307 308
			}

309
			if sdone {
310
				// No more bytes to write!
311 312 313
				done = true
				break
			}
314
			offset = cur + bs
315 316 317 318 319
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
320
	return k, done, err
321 322
}

Jeromy's avatar
Jeromy committed
323
// appendData appends the blocks from the given chan to the end of this dag
324 325 326
func (dm *DagModifier) appendData(nd node.Node, spl chunk.Splitter) (node.Node, error) {
	switch nd := nd.(type) {
	case *mdag.ProtoNode:
327 328 329 330 331
		dbp := &help.DagBuilderParams{
			Dagserv:  dm.dagserv,
			Maxlinks: help.DefaultLinksPerBlock,
		}
		return trickle.TrickleAppend(dm.ctx, nd, dbp.New(spl))
332 333 334 335 336
	case *mdag.RawNode:
		return nil, fmt.Errorf("appending to raw node types not yet supported")
	default:
		return nil, ErrNotUnixfs
	}
337 338
}

Jeromy's avatar
Jeromy committed
339
// Read data from this dag starting at the current offset
340
func (dm *DagModifier) Read(b []byte) (int, error) {
341
	err := dm.readPrep()
342 343 344 345
	if err != nil {
		return 0, err
	}

346 347 348 349 350 351 352 353 354 355 356
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

357
	if dm.read == nil {
358 359
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
360
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
361
			cancel()
362
			return err
363 364
		}

365
		i, err := dr.Seek(int64(dm.curWrOff), io.SeekStart)
366
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
367
			cancel()
368
			return err
369 370 371
		}

		if i != int64(dm.curWrOff) {
Jakub Sztandera's avatar
Jakub Sztandera committed
372
			cancel()
373
			return ErrSeekFail
374 375
		}

376
		dm.readCancel = cancel
377 378 379
		dm.read = dr
	}

380 381 382 383 384 385 386 387 388 389 390
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
391 392 393 394 395
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
396
func (dm *DagModifier) GetNode() (node.Node, error) {
397
	err := dm.Sync()
398 399 400
	if err != nil {
		return nil, err
	}
401
	return dm.curNode.Copy(), nil
402 403
}

Jeromy's avatar
Jeromy committed
404
// HasChanges returned whether or not there are unflushed changes to this dag
405 406 407 408 409
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
410
	err := dm.Sync()
411 412 413 414
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
415 416 417 418 419 420
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
421
	switch whence {
422
	case io.SeekCurrent:
Jeromy's avatar
Jeromy committed
423
		newoffset = dm.curWrOff + uint64(offset)
424
	case io.SeekStart:
Jeromy's avatar
Jeromy committed
425
		newoffset = uint64(offset)
426
	case io.SeekEnd:
427
		newoffset = uint64(fisize) - uint64(offset)
428
	default:
Jeromy's avatar
Jeromy committed
429
		return 0, ErrUnrecognizedWhence
430 431
	}

432 433
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
434 435 436 437 438 439
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

440 441 442 443 444 445 446 447 448 449 450
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
451
	err := dm.Sync()
452 453 454 455 456 457 458 459 460
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
461
	// Truncate can also be used to expand the file
462
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
463
		return dm.expandSparse(int64(size) - realSize)
464 465
	}

466
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
467 468 469 470 471 472 473 474 475 476 477 478 479
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
480
// dagTruncate truncates the given node to 'size' and returns the modified Node
481 482 483 484 485 486
func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGService) (*mdag.ProtoNode, error) {
	nd, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, ErrNoRawYet
	}

487
	if len(nd.Links()) == 0 {
488
		// TODO: this can likely be done without marshaling and remarshaling
489
		pbn, err := ft.FromBytes(nd.Data())
490 491 492 493
		if err != nil {
			return nil, err
		}

494
		nd.SetData(ft.WrapData(pbn.Data[:size]))
495 496 497 498 499
		return nd, nil
	}

	var cur uint64
	end := 0
500
	var modified *mdag.ProtoNode
501
	ndata := new(ft.FSNode)
502
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
503
		child, err := lnk.GetNode(ctx, ds)
504 505 506 507
		if err != nil {
			return nil, err
		}

508 509 510 511 512 513
		childpb, ok := child.(*mdag.ProtoNode)
		if !ok {
			return nil, err
		}

		childsize, err := ft.DataSize(childpb.Data())
514 515 516 517
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
518
		// found the child we want to cut
519
		if size < cur+childsize {
520
			nchild, err := dagTruncate(ctx, childpb, size-cur, ds)
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

540
	nd.SetLinks(nd.Links()[:end])
541 542 543 544 545 546 547 548 549 550
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

551
	nd.SetData(d)
552

553
	// invalidate cache and recompute serialized data
554
	_, err = nd.EncodeProtobuf(true)
555 556 557 558
	if err != nil {
		return nil, err
	}

559 560
	return nd, nil
}