dagmodifier.go 11.4 KB
Newer Older
1 2 3 4
package mod

import (
	"bytes"
5
	"context"
6
	"errors"
7
	"fmt"
8 9
	"io"

10 11 12 13 14 15
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
16

17
	cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
18
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
19
	node "gx/ipfs/Qmb3Hm9QDFmfYuET4pu7Kyg8JV78jFa1nvZx5vnCZsK4ck/go-ipld-format"
20 21
)

Jeromy's avatar
Jeromy committed
22 23 24
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

25 26 27 28 29 30 31 32
// 2MB
var writebufferSize = 1 << 21

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
33
	curNode node.Node
34

35
	splitter   chunk.SplitterGen
36 37 38 39 40 41 42
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

43
	read uio.DagReader
44 45
}

46 47
var ErrNotUnixfs = fmt.Errorf("dagmodifier only supports unixfs nodes (proto or raw)")

48
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
49 50 51 52 53
	switch from.(type) {
	case *mdag.ProtoNode, *mdag.RawNode:
		// ok
	default:
		return nil, ErrNotUnixfs
54 55
	}

56
	return &DagModifier{
57
		curNode:  from.Copy(),
58 59 60 61 62 63 64 65 66
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
67 68
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
69 70 71 72 73 74
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
75 76 77 78 79 80 81 82 83 84 85
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

86
		err = dm.Sync()
87 88 89 90 91 92 93 94 95 96 97 98 99
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
100
	for i := range b {
101 102 103 104 105
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
106 107
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
108 109
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
110
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
111
	nnode, err := dm.appendData(dm.curNode, spl)
112 113 114 115 116 117 118
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
119 120 121 122 123 124 125

	pbnnode, ok := nnode.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnnode
126 127 128
	return nil
}

Jeromy's avatar
Jeromy committed
129
// Write continues writing to the dag at the current offset
130 131 132 133 134 135 136
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
137

138 139 140 141 142 143
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
144
		err := dm.Sync()
145 146 147 148 149 150 151
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

152 153
var ErrNoRawYet = fmt.Errorf("currently only fully support protonodes in the dagmodifier")

154
func (dm *DagModifier) Size() (int64, error) {
155 156 157 158 159 160
	pbnd, ok := dm.curNode.(*mdag.ProtoNode)
	if !ok {
		return 0, ErrNoRawYet
	}

	pbn, err := ft.FromBytes(pbnd.Data())
161 162 163 164
	if err != nil {
		return 0, err
	}

165 166 167 168
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
169 170 171 172 173
	}

	return int64(pbn.GetFilesize()), nil
}

174 175
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
176
	// No buffer? Nothing to do
177 178 179 180 181 182 183 184 185 186
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
187
	// Number of bytes we're going to write
188 189
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
190
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
191
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
192 193 194 195
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
196
	nd, err := dm.dagserv.Get(dm.ctx, thisc)
197 198 199 200
	if err != nil {
		return err
	}

201 202 203 204 205 206
	pbnd, ok := nd.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnd
207

Jeromy's avatar
Jeromy committed
208
	// need to write past end of current dag
209
	if !done {
210
		nd, err := dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
211 212 213 214
		if err != nil {
			return err
		}

Jeromy's avatar
Jeromy committed
215
		_, err = dm.dagserv.Add(nd)
216 217 218 219
		if err != nil {
			return err
		}

220 221 222 223 224 225
		pbnode, ok := nd.(*mdag.ProtoNode)
		if !ok {
			return mdag.ErrNotProtobuf
		}

		dm.curNode = pbnode
226 227 228 229 230 231 232 233
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
234
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
235 236
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
237 238 239 240 241 242
func (dm *DagModifier) modifyDag(n node.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
	node, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, false, ErrNoRawYet
	}

243
	f, err := ft.FromBytes(node.Data())
244
	if err != nil {
Jeromy's avatar
Jeromy committed
245
		return nil, false, err
246 247
	}

Jeromy's avatar
Jeromy committed
248
	// If we've reached a leaf node.
249
	if len(node.Links()) == 0 {
250 251
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
252
			return nil, false, err
253 254 255 256 257
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
258
			return nil, false, err
259 260
		}

261
		nd := new(mdag.ProtoNode)
262
		nd.SetData(b)
263 264
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
265
			return nil, false, err
266 267 268 269
		}

		// Hey look! we're done!
		var done bool
270
		if n < len(f.Data[offset:]) {
271 272 273
			done = true
		}

Jeromy's avatar
Jeromy committed
274
		return k, done, nil
275 276 277 278 279
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
280
		// We found the correct child to write into
281
		if cur+bs > offset {
282
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
283
			if err != nil {
Jeromy's avatar
Jeromy committed
284
				return nil, false, err
285
			}
286 287 288 289 290 291 292

			childpb, ok := child.(*mdag.ProtoNode)
			if !ok {
				return nil, false, mdag.ErrNotProtobuf
			}

			k, sdone, err := dm.modifyDag(childpb, offset-cur, data)
293
			if err != nil {
Jeromy's avatar
Jeromy committed
294
				return nil, false, err
295 296 297
			}

			offset += bs
298
			node.Links()[i].Cid = k
299

300
			// Recache serialized node
301
			_, err = node.EncodeProtobuf(true)
302
			if err != nil {
Jeromy's avatar
Jeromy committed
303
				return nil, false, err
304 305
			}

306
			if sdone {
307
				// No more bytes to write!
308 309 310
				done = true
				break
			}
311
			offset = cur + bs
312 313 314 315 316
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
317
	return k, done, err
318 319
}

Jeromy's avatar
Jeromy committed
320
// appendData appends the blocks from the given chan to the end of this dag
321 322 323 324 325 326 327 328 329 330 331 332 333 334
func (dm *DagModifier) appendData(nd node.Node, spl chunk.Splitter) (node.Node, error) {

	var root *mdag.ProtoNode
	switch nd := nd.(type) {
	case *mdag.ProtoNode:
		root = nd
	case *mdag.RawNode:
		// TODO: be able to append to rawnodes. Probably requires making this
		// node a child of a unxifs intermediate node and passing it down
		return nil, fmt.Errorf("appending to raw node types not yet supported")
	default:
		return nil, ErrNotUnixfs
	}

335 336 337 338 339
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

340
	return trickle.TrickleAppend(dm.ctx, root, dbp.New(spl))
341 342
}

Jeromy's avatar
Jeromy committed
343
// Read data from this dag starting at the current offset
344
func (dm *DagModifier) Read(b []byte) (int, error) {
345
	err := dm.readPrep()
346 347 348 349
	if err != nil {
		return 0, err
	}

350 351 352 353 354 355 356 357 358 359 360
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

361
	if dm.read == nil {
362 363
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
364
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
365
			cancel()
366
			return err
367 368
		}

369
		i, err := dr.Seek(int64(dm.curWrOff), io.SeekStart)
370
		if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
371
			cancel()
372
			return err
373 374 375
		}

		if i != int64(dm.curWrOff) {
Jakub Sztandera's avatar
Jakub Sztandera committed
376
			cancel()
377
			return ErrSeekFail
378 379
		}

380
		dm.readCancel = cancel
381 382 383
		dm.read = dr
	}

384 385 386 387 388 389 390 391 392 393 394
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
395 396 397 398 399
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
400
func (dm *DagModifier) GetNode() (*mdag.ProtoNode, error) {
401
	err := dm.Sync()
402 403 404
	if err != nil {
		return nil, err
	}
405
	return dm.curNode.Copy().(*mdag.ProtoNode), nil
406 407
}

Jeromy's avatar
Jeromy committed
408
// HasChanges returned whether or not there are unflushed changes to this dag
409 410 411 412 413
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
414
	err := dm.Sync()
415 416 417 418
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
419 420 421 422 423 424
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
425
	switch whence {
426
	case io.SeekCurrent:
Jeromy's avatar
Jeromy committed
427
		newoffset = dm.curWrOff + uint64(offset)
428
	case io.SeekStart:
Jeromy's avatar
Jeromy committed
429
		newoffset = uint64(offset)
430
	case io.SeekEnd:
431
		newoffset = uint64(fisize) - uint64(offset)
432
	default:
Jeromy's avatar
Jeromy committed
433
		return 0, ErrUnrecognizedWhence
434 435
	}

436 437
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
438 439 440 441 442 443
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

444 445 446 447 448 449 450 451 452 453 454
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
455
	err := dm.Sync()
456 457 458 459 460 461 462 463 464
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
465
	// Truncate can also be used to expand the file
466
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
467
		return dm.expandSparse(int64(size) - realSize)
468 469
	}

470
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
471 472 473 474 475 476 477 478 479 480 481 482 483
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
484
// dagTruncate truncates the given node to 'size' and returns the modified Node
485 486 487 488 489 490
func dagTruncate(ctx context.Context, n node.Node, size uint64, ds mdag.DAGService) (*mdag.ProtoNode, error) {
	nd, ok := n.(*mdag.ProtoNode)
	if !ok {
		return nil, ErrNoRawYet
	}

491
	if len(nd.Links()) == 0 {
492
		// TODO: this can likely be done without marshaling and remarshaling
493
		pbn, err := ft.FromBytes(nd.Data())
494 495 496 497
		if err != nil {
			return nil, err
		}

498
		nd.SetData(ft.WrapData(pbn.Data[:size]))
499 500 501 502 503
		return nd, nil
	}

	var cur uint64
	end := 0
504
	var modified *mdag.ProtoNode
505
	ndata := new(ft.FSNode)
506
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
507
		child, err := lnk.GetNode(ctx, ds)
508 509 510 511
		if err != nil {
			return nil, err
		}

512 513 514 515 516 517
		childpb, ok := child.(*mdag.ProtoNode)
		if !ok {
			return nil, err
		}

		childsize, err := ft.DataSize(childpb.Data())
518 519 520 521
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
522
		// found the child we want to cut
523
		if size < cur+childsize {
524
			nchild, err := dagTruncate(ctx, childpb, size-cur, ds)
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

544
	nd.SetLinks(nd.Links()[:end])
545 546 547 548 549 550 551 552 553 554
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

555
	nd.SetData(d)
556

557
	// invalidate cache and recompute serialized data
558
	_, err = nd.EncodeProtobuf(true)
559 560 561 562
	if err != nil {
		return nil, err
	}

563 564
	return nd, nil
}