dagmodifier.go 11.8 KB
Newer Older
1 2 3 4
package mod

import (
	"bytes"
5
	"context"
6
	"errors"
7
	"fmt"
8 9
	"io"

10 11 12 13 14 15
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
16

17 18
	cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
	node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
19
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
20 21
)

Jeromy's avatar
Jeromy committed
22 23 24
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

25 26 27 28 29 30 31 32
// 2MB
var writebufferSize = 1 << 21

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
33
	curNode node.Node
34

35
	splitter   chunk.SplitterGen
36 37 38 39 40 41 42
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

43 44
	RawLeaves bool

45
	read uio.DagReader
46 47
}

48 49
var ErrNotUnixfs = fmt.Errorf("dagmodifier only supports unixfs nodes (proto or raw)")

50
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
51 52 53 54 55
	switch from.(type) {
	case *mdag.ProtoNode, *mdag.RawNode:
		// ok
	default:
		return nil, ErrNotUnixfs
56 57
	}

58
	return &DagModifier{
59
		curNode:  from.Copy(),
60 61 62 63 64 65 66 67 68
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
69 70
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
71 72 73 74 75 76
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
77 78 79 80 81 82 83 84 85 86 87
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

88
		err = dm.Sync()
89 90 91 92 93 94 95 96 97 98 99 100 101
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
102
	for i := range b {
103 104 105 106 107
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
108 109
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
110 111
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
112
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
113
	nnode, err := dm.appendData(dm.curNode, spl)
114 115 116 117
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
118
	return err
119 120
}

Jeromy's avatar
Jeromy committed
121
// Write continues writing to the dag at the current offset
122 123 124 125 126 127 128
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
129

130 131 132 133 134 135
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
136
		err := dm.Sync()
137 138 139 140 141 142 143
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

144
// Size returns the Filesize of the node
145
func (dm *DagModifier) Size() (int64, error) {
146 147 148 149 150 151 152 153 154 155 156 157
	fileSize, err := fileSize(dm.curNode)
	if err != nil {
		return 0, err
	}
	if dm.wrBuf != nil && int64(dm.wrBuf.Len())+int64(dm.writeStart) > int64(fileSize) {
		return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
	}
	return int64(fileSize), nil
}

func fileSize(n node.Node) (uint64, error) {
	switch nd := n.(type) {
158
	case *mdag.ProtoNode:
159
		f, err := ft.FromBytes(nd.Data())
160 161 162
		if err != nil {
			return 0, err
		}
163
		return f.GetFilesize(), nil
164
	case *mdag.RawNode:
165
		return uint64(len(nd.RawData())), nil
166 167
	default:
		return 0, ErrNotUnixfs
168 169 170
	}
}

171 172
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
173
	// No buffer? Nothing to do
174 175 176 177 178 179 180 181 182 183
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
184
	// Number of bytes we're going to write
185 186
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
187
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
188
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
189 190 191 192
	if err != nil {
		return err
	}

193
	dm.curNode, err = dm.dagserv.Get(dm.ctx, thisc)
194 195 196 197
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
198
	// need to write past end of current dag
199
	if !done {
200
		dm.curNode, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
201 202 203 204
		if err != nil {
			return err
		}

205
		_, err = dm.dagserv.Add(dm.curNode)
206 207 208 209 210 211 212 213 214 215 216
		if err != nil {
			return err
		}
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
217
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
218 219
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
220
func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
221 222 223 224 225 226 227 228
	// If we've reached a leaf node.
	if len(n.Links()) == 0 {
		switch nd0 := n.(type) {
		case *mdag.ProtoNode:
			f, err := ft.FromBytes(nd0.Data())
			if err != nil {
				return nil, false, err
			}
229

230 231 232 233
			n, err := data.Read(f.Data[offset:])
			if err != nil && err != io.EOF {
				return nil, false, err
			}
234

235 236 237 238 239
			// Update newly written node..
			b, err := proto.Marshal(f)
			if err != nil {
				return nil, false, err
			}
240

241 242 243 244 245 246
			nd := new(mdag.ProtoNode)
			nd.SetData(b)
			k, err := dm.dagserv.Add(nd)
			if err != nil {
				return nil, false, err
			}
247

248 249 250 251 252 253 254 255 256 257
			// Hey look! we're done!
			var done bool
			if n < len(f.Data[offset:]) {
				done = true
			}

			return k, done, nil
		case *mdag.RawNode:
			origData := nd0.RawData()
			bytes := make([]byte, len(origData))
258

259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
			// copy orig data up to offset
			copy(bytes, origData[:offset])

			// copy in new data
			n, err := data.Read(bytes[offset:])
			if err != nil && err != io.EOF {
				return nil, false, err
			}

			// copy remaining data
			offsetPlusN := int(offset) + n
			if offsetPlusN < len(origData) {
				copy(bytes[offsetPlusN:], origData[offsetPlusN:])
			}

			nd, err := mdag.NewRawNodeWPrefix(bytes, nd0.Cid().Prefix())
			if err != nil {
				return nil, false, err
			}
			k, err := dm.dagserv.Add(nd)
			if err != nil {
				return nil, false, err
			}

			// Hey look! we're done!
			var done bool
			if n < len(bytes[offset:]) {
				done = true
			}

			return k, done, nil
290
		}
291
	}
292

293 294 295 296 297 298 299 300
	node, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, false, ErrNotUnixfs
	}

	f, err := ft.FromBytes(node.Data())
	if err != nil {
		return nil, false, err
301 302 303 304 305
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
306
		// We found the correct child to write into
307
		if cur+bs > offset {
308
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
309
			if err != nil {
Jeromy's avatar
Jeromy committed
310
				return nil, false, err
311
			}
312

313
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
314
			if err != nil {
Jeromy's avatar
Jeromy committed
315
				return nil, false, err
316 317 318
			}

			offset += bs
319
			node.Links()[i].Cid = k
320

321
			// Recache serialized node
322
			_, err = node.EncodeProtobuf(true)
323
			if err != nil {
Jeromy's avatar
Jeromy committed
324
				return nil, false, err
325 326
			}

327
			if sdone {
328
				// No more bytes to write!
329 330 331
				done = true
				break
			}
332
			offset = cur + bs
333 334 335 336 337
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
338
	return k, done, err
339 340
}

Jeromy's avatar
Jeromy committed
341
// appendData appends the blocks from the given chan to the end of this dag
342 343
func (dm *DagModifier) appendData(nd node.Node, spl chunk.Splitter) (node.Node, error) {
	switch nd := nd.(type) {
344
	case *mdag.ProtoNode, *mdag.RawNode:
345
		dbp := &help.DagBuilderParams{
346 347 348
			Dagserv:   dm.dagserv,
			Maxlinks:  help.DefaultLinksPerBlock,
			RawLeaves: dm.RawLeaves,
349 350
		}
		return trickle.TrickleAppend(dm.ctx, nd, dbp.New(spl))
351 352 353
	default:
		return nil, ErrNotUnixfs
	}
354 355
}

Jeromy's avatar
Jeromy committed
356
// Read data from this dag starting at the current offset
357
func (dm *DagModifier) Read(b []byte) (int, error) {
358
	err := dm.readPrep()
359 360 361 362
	if err != nil {
		return 0, err
	}

363 364 365 366 367 368 369 370 371 372 373
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

374
	if dm.read == nil {
375 376
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
377
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
378
			cancel()
379
			return err
380 381
		}

382
		i, err := dr.Seek(int64(dm.curWrOff), io.SeekStart)
383
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
384
			cancel()
385
			return err
386 387 388
		}

		if i != int64(dm.curWrOff) {
Jakub Sztandera's avatar
Jakub Sztandera committed
389
			cancel()
390
			return ErrSeekFail
391 392
		}

393
		dm.readCancel = cancel
394 395 396
		dm.read = dr
	}

397 398 399 400 401 402 403 404 405 406 407
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
408 409 410 411 412
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
413
func (dm *DagModifier) GetNode() (node.Node, error) {
414
	err := dm.Sync()
415 416 417
	if err != nil {
		return nil, err
	}
418
	return dm.curNode.Copy(), nil
419 420
}

Jeromy's avatar
Jeromy committed
421
// HasChanges returned whether or not there are unflushed changes to this dag
422 423 424 425 426
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
427
	err := dm.Sync()
428 429 430 431
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
432 433 434 435 436 437
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
438
	switch whence {
439
	case io.SeekCurrent:
Jeromy's avatar
Jeromy committed
440
		newoffset = dm.curWrOff + uint64(offset)
441
	case io.SeekStart:
Jeromy's avatar
Jeromy committed
442
		newoffset = uint64(offset)
443
	case io.SeekEnd:
444
		newoffset = uint64(fisize) - uint64(offset)
445
	default:
Jeromy's avatar
Jeromy committed
446
		return 0, ErrUnrecognizedWhence
447 448
	}

449 450
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
451 452 453 454 455 456
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

457 458 459 460 461 462 463 464 465 466 467
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
468
	err := dm.Sync()
469 470 471 472 473 474 475 476 477
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
478
	// Truncate can also be used to expand the file
479
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
480
		return dm.expandSparse(int64(size) - realSize)
481 482
	}

483
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
484 485 486 487 488 489 490 491 492 493 494 495 496
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
497
// dagTruncate truncates the given node to 'size' and returns the modified Node
498 499 500 501 502 503 504 505 506 507 508 509 510
func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGService) (node.Node, error) {
	if len(n.Links()) == 0 {
		switch nd := n.(type) {
		case *mdag.ProtoNode:
			// TODO: this can likely be done without marshaling and remarshaling
			pbn, err := ft.FromBytes(nd.Data())
			if err != nil {
				return nil, err
			}
			nd.SetData(ft.WrapData(pbn.Data[:size]))
			return nd, nil
		case *mdag.RawNode:
			return mdag.NewRawNodeWPrefix(nd.RawData()[:size], nd.Cid().Prefix())
511
		}
512
	}
513

514 515 516
	nd, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, ErrNotUnixfs
517 518 519 520
	}

	var cur uint64
	end := 0
521
	var modified node.Node
522
	ndata := new(ft.FSNode)
523
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
524
		child, err := lnk.GetNode(ctx, ds)
525 526 527 528
		if err != nil {
			return nil, err
		}

529
		childsize, err := fileSize(child)
530 531 532 533
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
534
		// found the child we want to cut
535
		if size < cur+childsize {
536
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

556
	nd.SetLinks(nd.Links()[:end])
557 558 559 560 561 562 563 564 565 566
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

567
	nd.SetData(d)
568

569
	// invalidate cache and recompute serialized data
570
	_, err = nd.EncodeProtobuf(true)
571 572 573 574
	if err != nil {
		return nil, err
	}

575 576
	return nd, nil
}