dagmodifier.go 10.5 KB
Newer Older
1 2 3 4 5 6 7 8
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"

9
	proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
10 11 12
	mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

13
	key "github.com/ipfs/go-ipfs/blocks/key"
14 15 16 17 18 19 20
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	pin "github.com/ipfs/go-ipfs/pin"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
Jeromy's avatar
Jeromy committed
21
	logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
22 23
)

Jeromy's avatar
Jeromy committed
24 25 26 27
var ErrSeekFail = errors.New("failed to seek properly")
var ErrSeekEndNotImpl = errors.New("SEEK_END currently not implemented")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

28 29 30
// 2MB
var writebufferSize = 1 << 21

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("dagio")
32 33 34 35 36 37 38

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
	curNode *mdag.Node
39
	mp      pin.Pinner
40

41
	splitter   chunk.SplitterGen
42 43 44 45 46 47 48 49 50 51
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

52
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.Pinner, spl chunk.SplitterGen) (*DagModifier, error) {
53 54 55 56 57 58 59 60 61 62 63 64
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
		mp:       mp,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
65 66
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
67 68 69 70 71 72
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
73 74 75 76 77 78 79 80 81 82 83
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

84
		err = dm.Sync()
85 86 87 88 89 90 91 92 93 94 95 96 97
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
98
	for i := range b {
99 100 101 102 103
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
104 105
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
106 107
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
108 109 110
	spl := chunk.NewSizeSplitter(r, 4096)
	blks, errs := chunk.Chan(spl)
	nnode, err := dm.appendData(dm.curNode, blks, errs)
111 112 113 114 115 116 117 118 119 120 121
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
122
// Write continues writing to the dag at the current offset
123 124 125 126 127 128 129
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
130

131 132 133 134 135 136
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
137
		err := dm.Sync()
138 139 140 141 142 143 144 145
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
146
	pbn, err := ft.FromBytes(dm.curNode.Data)
147 148 149 150
	if err != nil {
		return 0, err
	}

151 152 153 154
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
155 156 157 158 159
	}

	return int64(pbn.GetFilesize()), nil
}

160 161
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
162
	// No buffer? Nothing to do
163 164 165 166 167 168 169 170 171 172
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
173
	// Number of bytes we're going to write
174 175
	buflen := dm.wrBuf.Len()

176 177 178 179 180 181
	// Grab key for unpinning after mod operation
	curk, err := dm.curNode.Key()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
182
	// overwrite existing dag nodes
183
	thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
184 185 186 187
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
188
	nd, err := dm.dagserv.Get(dm.ctx, thisk)
189 190 191 192 193 194
	if err != nil {
		return err
	}

	dm.curNode = nd

Jeromy's avatar
Jeromy committed
195
	// need to write past end of current dag
196
	if !done {
197 198
		blks, errs := chunk.Chan(dm.splitter(dm.wrBuf))
		nd, err = dm.appendData(dm.curNode, blks, errs)
199 200 201 202
		if err != nil {
			return err
		}

203
		thisk, err = dm.dagserv.Add(nd)
204 205 206 207 208 209 210
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

211 212
	// Finalize correct pinning, and flush pinner.
	// Be careful about the order, as curk might equal thisk.
213
	dm.mp.RemovePinWithMode(curk, pin.Recursive)
214
	dm.mp.PinWithMode(thisk, pin.Recursive)
215 216 217 218 219
	err = dm.mp.Flush()
	if err != nil {
		return err
	}

220 221 222 223 224 225
	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
226
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
227 228
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
229
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (key.Key, bool, error) {
230 231
	f, err := ft.FromBytes(node.Data)
	if err != nil {
Jeromy's avatar
Jeromy committed
232
		return "", false, err
233 234
	}

Jeromy's avatar
Jeromy committed
235 236
	// If we've reached a leaf node.
	if len(node.Links) == 0 {
237 238
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
239
			return "", false, err
240 241 242 243 244
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
245
			return "", false, err
246 247 248 249 250
		}

		nd := &mdag.Node{Data: b}
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
251
			return "", false, err
252 253 254 255
		}

		// Hey look! we're done!
		var done bool
256
		if n < len(f.Data[offset:]) {
257 258 259
			done = true
		}

Jeromy's avatar
Jeromy committed
260
		return k, done, nil
261 262 263 264 265
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
266
		// We found the correct child to write into
267
		if cur+bs > offset {
Jeromy's avatar
Jeromy committed
268
			child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv)
269
			if err != nil {
Jeromy's avatar
Jeromy committed
270
				return "", false, err
271
			}
Jeromy's avatar
Jeromy committed
272
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
273
			if err != nil {
Jeromy's avatar
Jeromy committed
274
				return "", false, err
275 276 277 278 279
			}

			offset += bs
			node.Links[i].Hash = mh.Multihash(k)

280 281 282 283 284 285
			// Recache serialized node
			_, err = node.Encoded(true)
			if err != nil {
				return "", false, err
			}

286
			if sdone {
287
				// No more bytes to write!
288 289 290
				done = true
				break
			}
291
			offset = cur + bs
292 293 294 295 296
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
297
	return k, done, err
298 299
}

Jeromy's avatar
Jeromy committed
300
// appendData appends the blocks from the given chan to the end of this dag
301
func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte, errs <-chan error) (*mdag.Node, error) {
302 303 304 305 306
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

307
	return trickle.TrickleAppend(dm.ctx, node, dbp.New(blks, errs))
308 309
}

Jeromy's avatar
Jeromy committed
310
// Read data from this dag starting at the current offset
311
func (dm *DagModifier) Read(b []byte) (int, error) {
312
	err := dm.readPrep()
313 314 315 316
	if err != nil {
		return 0, err
	}

317 318 319 320 321 322 323 324 325 326 327
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

328
	if dm.read == nil {
329 330
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
331
		if err != nil {
332
			return err
333 334 335 336
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
337
			return err
338 339 340
		}

		if i != int64(dm.curWrOff) {
341
			return ErrSeekFail
342 343
		}

344
		dm.readCancel = cancel
345 346 347
		dm.read = dr
	}

348 349 350 351 352 353 354 355 356 357 358
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
359 360 361 362 363 364
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
365
	err := dm.Sync()
366 367 368 369 370 371
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
372
// HasChanges returned whether or not there are unflushed changes to this dag
373 374 375 376 377
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
378
	err := dm.Sync()
379 380 381 382 383 384 385 386 387 388 389 390
	if err != nil {
		return 0, err
	}

	switch whence {
	case os.SEEK_CUR:
		dm.curWrOff += uint64(offset)
		dm.writeStart = dm.curWrOff
	case os.SEEK_SET:
		dm.curWrOff = uint64(offset)
		dm.writeStart = uint64(offset)
	case os.SEEK_END:
Jeromy's avatar
Jeromy committed
391
		return 0, ErrSeekEndNotImpl
392
	default:
Jeromy's avatar
Jeromy committed
393
		return 0, ErrUnrecognizedWhence
394 395 396 397 398 399 400 401 402 403 404 405 406
	}

	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
407
	err := dm.Sync()
408 409 410 411 412 413 414 415 416
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
417
	// Truncate can also be used to expand the file
418
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
419
		return dm.expandSparse(int64(size) - realSize)
420 421
	}

422
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
423 424 425 426 427 428 429 430 431 432 433 434 435
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
436
// dagTruncate truncates the given node to 'size' and returns the modified Node
437
func dagTruncate(ctx context.Context, nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
	if len(nd.Links) == 0 {
		// TODO: this can likely be done without marshaling and remarshaling
		pbn, err := ft.FromBytes(nd.Data)
		if err != nil {
			return nil, err
		}

		nd.Data = ft.WrapData(pbn.Data[:size])
		return nd, nil
	}

	var cur uint64
	end := 0
	var modified *mdag.Node
	ndata := new(ft.FSNode)
	for i, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
454
		child, err := lnk.GetNode(ctx, ds)
455 456 457 458 459 460 461 462 463
		if err != nil {
			return nil, err
		}

		childsize, err := ft.DataSize(child.Data)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
464
		// found the child we want to cut
465
		if size < cur+childsize {
466
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

	nd.Links = nd.Links[:end]
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

	nd.Data = d

499 500 501 502 503 504
	// invalidate cache and recompute serialized data
	_, err = nd.Encoded(true)
	if err != nil {
		return nil, err
	}

505 506
	return nd, nil
}