dagmodifier.go 10.4 KB
Newer Older
1 2 3 4 5 6 7 8
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"

9
	proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
10 11 12
	mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

13
	key "github.com/ipfs/go-ipfs/blocks/key"
14 15 16 17 18 19
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
Jeromy's avatar
Jeromy committed
20
	logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
21 22
)

Jeromy's avatar
Jeromy committed
23 24 25 26
var ErrSeekFail = errors.New("failed to seek properly")
var ErrSeekEndNotImpl = errors.New("SEEK_END currently not implemented")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

27 28 29
// 2MB
var writebufferSize = 1 << 21

Jeromy's avatar
Jeromy committed
30
var log = logging.Logger("dagio")
31 32 33 34 35 36 37 38

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
	curNode *mdag.Node

39
	splitter   chunk.SplitterGen
40 41 42 43 44 45 46 47 48 49
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

50
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
51 52 53 54 55 56 57 58 59 60 61
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
62 63
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
64 65 66 67 68 69
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
70 71 72 73 74 75 76 77 78 79 80
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

81
		err = dm.Sync()
82 83 84 85 86 87 88 89 90 91 92 93 94
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
95
	for i := range b {
96 97 98 99 100
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
101 102
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
103 104
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
105 106 107
	spl := chunk.NewSizeSplitter(r, 4096)
	blks, errs := chunk.Chan(spl)
	nnode, err := dm.appendData(dm.curNode, blks, errs)
108 109 110 111 112 113 114 115 116 117 118
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
119
// Write continues writing to the dag at the current offset
120 121 122 123 124 125 126
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
127

128 129 130 131 132 133
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
134
		err := dm.Sync()
135 136 137 138 139 140 141 142
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
143
	pbn, err := ft.FromBytes(dm.curNode.Data)
144 145 146 147
	if err != nil {
		return 0, err
	}

148 149 150 151
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
152 153 154 155 156
	}

	return int64(pbn.GetFilesize()), nil
}

157 158
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
159
	// No buffer? Nothing to do
160 161 162 163 164 165 166 167 168 169
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
170
	// Number of bytes we're going to write
171 172
	buflen := dm.wrBuf.Len()

173
	// Grab key for unpinning after mod operation
174
	_, err := dm.curNode.Key()
175 176 177 178
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
179
	// overwrite existing dag nodes
180
	thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
181 182 183 184
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
185
	nd, err := dm.dagserv.Get(dm.ctx, thisk)
186 187 188 189 190 191
	if err != nil {
		return err
	}

	dm.curNode = nd

Jeromy's avatar
Jeromy committed
192
	// need to write past end of current dag
193
	if !done {
194 195
		blks, errs := chunk.Chan(dm.splitter(dm.wrBuf))
		nd, err = dm.appendData(dm.curNode, blks, errs)
196 197 198 199
		if err != nil {
			return err
		}

200
		thisk, err = dm.dagserv.Add(nd)
201 202 203 204 205 206 207 208 209 210 211 212 213
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
214
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
215 216
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
217
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (key.Key, bool, error) {
218 219
	f, err := ft.FromBytes(node.Data)
	if err != nil {
Jeromy's avatar
Jeromy committed
220
		return "", false, err
221 222
	}

Jeromy's avatar
Jeromy committed
223 224
	// If we've reached a leaf node.
	if len(node.Links) == 0 {
225 226
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
227
			return "", false, err
228 229 230 231 232
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
233
			return "", false, err
234 235 236 237 238
		}

		nd := &mdag.Node{Data: b}
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
239
			return "", false, err
240 241 242 243
		}

		// Hey look! we're done!
		var done bool
244
		if n < len(f.Data[offset:]) {
245 246 247
			done = true
		}

Jeromy's avatar
Jeromy committed
248
		return k, done, nil
249 250 251 252 253
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
254
		// We found the correct child to write into
255
		if cur+bs > offset {
Jeromy's avatar
Jeromy committed
256
			child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv)
257
			if err != nil {
Jeromy's avatar
Jeromy committed
258
				return "", false, err
259
			}
Jeromy's avatar
Jeromy committed
260
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
261
			if err != nil {
Jeromy's avatar
Jeromy committed
262
				return "", false, err
263 264 265 266 267
			}

			offset += bs
			node.Links[i].Hash = mh.Multihash(k)

268 269 270 271 272 273
			// Recache serialized node
			_, err = node.Encoded(true)
			if err != nil {
				return "", false, err
			}

274
			if sdone {
275
				// No more bytes to write!
276 277 278
				done = true
				break
			}
279
			offset = cur + bs
280 281 282 283 284
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
285
	return k, done, err
286 287
}

Jeromy's avatar
Jeromy committed
288
// appendData appends the blocks from the given chan to the end of this dag
289
func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte, errs <-chan error) (*mdag.Node, error) {
290 291 292 293 294
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

295
	return trickle.TrickleAppend(dm.ctx, node, dbp.New(blks, errs))
296 297
}

Jeromy's avatar
Jeromy committed
298
// Read data from this dag starting at the current offset
299
func (dm *DagModifier) Read(b []byte) (int, error) {
300
	err := dm.readPrep()
301 302 303 304
	if err != nil {
		return 0, err
	}

305 306 307 308 309 310 311 312 313 314 315
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

316
	if dm.read == nil {
317 318
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
319
		if err != nil {
320
			return err
321 322 323 324
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
325
			return err
326 327 328
		}

		if i != int64(dm.curWrOff) {
329
			return ErrSeekFail
330 331
		}

332
		dm.readCancel = cancel
333 334 335
		dm.read = dr
	}

336 337 338 339 340 341 342 343 344 345 346
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
347 348 349 350 351 352
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
353
	err := dm.Sync()
354 355 356 357 358 359
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
360
// HasChanges returned whether or not there are unflushed changes to this dag
361 362 363 364 365
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
366
	err := dm.Sync()
367 368 369 370
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
371 372 373 374 375 376
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
377 378
	switch whence {
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
379
		newoffset = dm.curWrOff + uint64(offset)
380
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
381
		newoffset = uint64(offset)
382
	case os.SEEK_END:
Jeromy's avatar
Jeromy committed
383
		return 0, ErrSeekEndNotImpl
384
	default:
Jeromy's avatar
Jeromy committed
385
		return 0, ErrUnrecognizedWhence
386 387
	}

Jeromy's avatar
Jeromy committed
388 389 390 391 392 393 394 395
	if offset > fisize {
		if err := dm.expandSparse(offset - fisize); err != nil {
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

396 397 398 399 400 401 402 403 404 405 406
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
407
	err := dm.Sync()
408 409 410 411 412 413 414 415 416
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
417
	// Truncate can also be used to expand the file
418
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
419
		return dm.expandSparse(int64(size) - realSize)
420 421
	}

422
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
423 424 425 426 427 428 429 430 431 432 433 434 435
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
436
// dagTruncate truncates the given node to 'size' and returns the modified Node
437
func dagTruncate(ctx context.Context, nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
	if len(nd.Links) == 0 {
		// TODO: this can likely be done without marshaling and remarshaling
		pbn, err := ft.FromBytes(nd.Data)
		if err != nil {
			return nil, err
		}

		nd.Data = ft.WrapData(pbn.Data[:size])
		return nd, nil
	}

	var cur uint64
	end := 0
	var modified *mdag.Node
	ndata := new(ft.FSNode)
	for i, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
454
		child, err := lnk.GetNode(ctx, ds)
455 456 457 458 459 460 461 462 463
		if err != nil {
			return nil, err
		}

		childsize, err := ft.DataSize(child.Data)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
464
		// found the child we want to cut
465
		if size < cur+childsize {
466
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

	nd.Links = nd.Links[:end]
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

	nd.Data = d

499 500 501 502 503 504
	// invalidate cache and recompute serialized data
	_, err = nd.Encoded(true)
	if err != nil {
		return nil, err
	}

505 506
	return nd, nil
}