dagmodifier.go 10.7 KB
Newer Older
1 2 3 4 5 6 7
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"
Jeromy's avatar
Jeromy committed
8
	"time"
9

10
	proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
11 12 13
	mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

14
	key "github.com/ipfs/go-ipfs/blocks/key"
15
	imp "github.com/ipfs/go-ipfs/importer"
16 17 18 19 20 21 22 23
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	pin "github.com/ipfs/go-ipfs/pin"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
	u "github.com/ipfs/go-ipfs/util"
24 25
)

Jeromy's avatar
Jeromy committed
26 27 28 29
var ErrSeekFail = errors.New("failed to seek properly")
var ErrSeekEndNotImpl = errors.New("SEEK_END currently not implemented")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
// 2MB
var writebufferSize = 1 << 21

var log = u.Logger("dagio")

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
	curNode *mdag.Node
	mp      pin.ManualPinner

	splitter   chunk.BlockSplitter
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*DagModifier, error) {
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
		mp:       mp,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
67 68
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
69 70 71 72 73 74
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
75 76 77 78 79 80 81 82 83 84 85
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

86
		err = dm.Sync()
87 88 89 90 91 92 93 94 95 96 97 98 99
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
100
	for i := range b {
101 102 103 104 105
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
106 107
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
func (dm *DagModifier) expandSparse(size int64) error {
	spl := chunk.SizeSplitter{4096}
	r := io.LimitReader(zeroReader{}, size)
	blks := spl.Split(r)
	nnode, err := dm.appendData(dm.curNode, blks)
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
124
// Write continues writing to the dag at the current offset
125 126 127 128 129 130 131
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
132

133 134 135 136 137 138
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
139
		err := dm.Sync()
140 141 142 143 144 145 146 147
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
148
	pbn, err := ft.FromBytes(dm.curNode.Data)
149 150 151 152
	if err != nil {
		return 0, err
	}

153 154 155 156
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
157 158 159 160 161
	}

	return int64(pbn.GetFilesize()), nil
}

162 163
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
164
	// No buffer? Nothing to do
165 166 167 168 169 170 171 172 173 174
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
175
	// Number of bytes we're going to write
176 177
	buflen := dm.wrBuf.Len()

178 179 180 181 182 183
	// Grab key for unpinning after mod operation
	curk, err := dm.curNode.Key()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
184
	// overwrite existing dag nodes
185
	thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
186 187 188 189
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
190
	nd, err := dm.dagserv.Get(dm.ctx, thisk)
191 192 193 194 195 196
	if err != nil {
		return err
	}

	dm.curNode = nd

Jeromy's avatar
Jeromy committed
197
	// need to write past end of current dag
198 199 200 201 202 203 204
	if !done {
		blks := dm.splitter.Split(dm.wrBuf)
		nd, err = dm.appendData(dm.curNode, blks)
		if err != nil {
			return err
		}

205
		thisk, err = dm.dagserv.Add(nd)
206 207 208 209 210 211 212
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

213 214 215 216 217 218 219 220
	// Finalize correct pinning, and flush pinner
	dm.mp.PinWithMode(thisk, pin.Recursive)
	dm.mp.RemovePinWithMode(curk, pin.Recursive)
	err = dm.mp.Flush()
	if err != nil {
		return err
	}

221 222 223 224 225 226
	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
227
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
228 229
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
230
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (key.Key, bool, error) {
231 232
	f, err := ft.FromBytes(node.Data)
	if err != nil {
Jeromy's avatar
Jeromy committed
233
		return "", false, err
234 235
	}

Jeromy's avatar
Jeromy committed
236 237
	// If we've reached a leaf node.
	if len(node.Links) == 0 {
238 239
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
240
			return "", false, err
241 242 243 244 245
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
246
			return "", false, err
247 248 249 250 251
		}

		nd := &mdag.Node{Data: b}
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
252
			return "", false, err
253 254 255 256
		}

		// Hey look! we're done!
		var done bool
257
		if n < len(f.Data[offset:]) {
258 259 260
			done = true
		}

Jeromy's avatar
Jeromy committed
261
		return k, done, nil
262 263 264 265 266
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
267
		// We found the correct child to write into
268
		if cur+bs > offset {
269
			// Unpin block
270
			ckey := key.Key(node.Links[i].Hash)
271 272
			dm.mp.RemovePinWithMode(ckey, pin.Indirect)

Jeromy's avatar
Jeromy committed
273
			child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv)
274
			if err != nil {
Jeromy's avatar
Jeromy committed
275
				return "", false, err
276
			}
Jeromy's avatar
Jeromy committed
277
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
278
			if err != nil {
Jeromy's avatar
Jeromy committed
279
				return "", false, err
280 281
			}

282 283 284
			// pin the new node
			dm.mp.PinWithMode(k, pin.Indirect)

285 286 287
			offset += bs
			node.Links[i].Hash = mh.Multihash(k)

288 289 290 291 292 293
			// Recache serialized node
			_, err = node.Encoded(true)
			if err != nil {
				return "", false, err
			}

294
			if sdone {
295
				// No more bytes to write!
296 297 298
				done = true
				break
			}
299
			offset = cur + bs
300 301 302 303 304
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
305
	return k, done, err
306 307
}

Jeromy's avatar
Jeromy committed
308
// appendData appends the blocks from the given chan to the end of this dag
309 310 311 312
func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.Node, error) {
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
313
		NodeCB:   imp.BasicPinnerCB(dm.mp),
314 315 316 317 318
	}

	return trickle.TrickleAppend(node, dbp.New(blks))
}

Jeromy's avatar
Jeromy committed
319
// Read data from this dag starting at the current offset
320
func (dm *DagModifier) Read(b []byte) (int, error) {
321
	err := dm.readPrep()
322 323 324 325
	if err != nil {
		return 0, err
	}

326 327 328 329 330 331 332 333 334 335 336
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

337
	if dm.read == nil {
338 339
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
340
		if err != nil {
341
			return err
342 343 344 345
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
346
			return err
347 348 349
		}

		if i != int64(dm.curWrOff) {
350
			return ErrSeekFail
351 352
		}

353
		dm.readCancel = cancel
354 355 356
		dm.read = dr
	}

357 358 359 360 361 362 363 364 365 366 367
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
368 369 370 371 372 373
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
374
	err := dm.Sync()
375 376 377 378 379 380
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
381
// HasChanges returned whether or not there are unflushed changes to this dag
382 383 384 385 386
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
387
	err := dm.Sync()
388 389 390 391 392 393 394 395 396 397 398 399
	if err != nil {
		return 0, err
	}

	switch whence {
	case os.SEEK_CUR:
		dm.curWrOff += uint64(offset)
		dm.writeStart = dm.curWrOff
	case os.SEEK_SET:
		dm.curWrOff = uint64(offset)
		dm.writeStart = uint64(offset)
	case os.SEEK_END:
Jeromy's avatar
Jeromy committed
400
		return 0, ErrSeekEndNotImpl
401
	default:
Jeromy's avatar
Jeromy committed
402
		return 0, ErrUnrecognizedWhence
403 404 405 406 407 408 409 410 411 412 413 414 415
	}

	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
416
	err := dm.Sync()
417 418 419 420 421 422 423 424 425
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
426
	// Truncate can also be used to expand the file
427
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
428
		return dm.expandSparse(int64(size) - realSize)
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
	}

	nnode, err := dagTruncate(dm.curNode, uint64(size), dm.dagserv)
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
445
// dagTruncate truncates the given node to 'size' and returns the modified Node
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
func dagTruncate(nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
	if len(nd.Links) == 0 {
		// TODO: this can likely be done without marshaling and remarshaling
		pbn, err := ft.FromBytes(nd.Data)
		if err != nil {
			return nil, err
		}

		nd.Data = ft.WrapData(pbn.Data[:size])
		return nd, nil
	}

	var cur uint64
	end := 0
	var modified *mdag.Node
	ndata := new(ft.FSNode)
	for i, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
463 464 465 466
		ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
		defer cancel()

		child, err := lnk.GetNode(ctx, ds)
467 468 469 470 471 472 473 474 475
		if err != nil {
			return nil, err
		}

		childsize, err := ft.DataSize(child.Data)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
476
		// found the child we want to cut
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
		if size < cur+childsize {
			nchild, err := dagTruncate(child, size-cur, ds)
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

	nd.Links = nd.Links[:end]
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

	nd.Data = d

511 512 513 514 515 516
	// invalidate cache and recompute serialized data
	_, err = nd.Encoded(true)
	if err != nil {
		return nil, err
	}

517 518
	return nd, nil
}