dagmodifier.go 10.5 KB
Newer Older
1 2 3 4 5 6 7
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"
Jeromy's avatar
Jeromy committed
8
	"time"
9

10
	proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
11 12 13 14 15 16 17 18 19 20 21
	mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	pin "github.com/ipfs/go-ipfs/pin"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
	u "github.com/ipfs/go-ipfs/util"
22 23
)

Jeromy's avatar
Jeromy committed
24 25 26 27
var ErrSeekFail = errors.New("failed to seek properly")
var ErrSeekEndNotImpl = errors.New("SEEK_END currently not implemented")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
// 2MB
var writebufferSize = 1 << 21

var log = u.Logger("dagio")

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
	curNode *mdag.Node
	mp      pin.ManualPinner

	splitter   chunk.BlockSplitter
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*DagModifier, error) {
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
		mp:       mp,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
65 66
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
67 68 69 70 71 72
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
73 74 75 76 77 78 79 80 81 82 83
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

84
		err = dm.Sync()
85 86 87 88 89 90 91 92 93 94 95 96 97
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
98
	for i := range b {
99 100 101 102 103
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
104 105
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
func (dm *DagModifier) expandSparse(size int64) error {
	spl := chunk.SizeSplitter{4096}
	r := io.LimitReader(zeroReader{}, size)
	blks := spl.Split(r)
	nnode, err := dm.appendData(dm.curNode, blks)
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
122
// Write continues writing to the dag at the current offset
123 124 125 126 127 128 129
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
130

131 132 133 134 135 136
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
137
		err := dm.Sync()
138 139 140 141 142 143 144 145
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
146
	pbn, err := ft.FromBytes(dm.curNode.Data)
147 148 149 150
	if err != nil {
		return 0, err
	}

151 152 153 154
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
155 156 157 158 159
	}

	return int64(pbn.GetFilesize()), nil
}

160 161
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
162
	// No buffer? Nothing to do
163 164 165 166 167 168 169 170 171 172
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
173
	// Number of bytes we're going to write
174 175
	buflen := dm.wrBuf.Len()

176 177 178 179 180 181
	// Grab key for unpinning after mod operation
	curk, err := dm.curNode.Key()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
182
	// overwrite existing dag nodes
183
	thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
184 185 186 187
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
188
	nd, err := dm.dagserv.Get(dm.ctx, thisk)
189 190 191 192 193 194
	if err != nil {
		return err
	}

	dm.curNode = nd

Jeromy's avatar
Jeromy committed
195
	// need to write past end of current dag
196 197 198 199 200 201 202
	if !done {
		blks := dm.splitter.Split(dm.wrBuf)
		nd, err = dm.appendData(dm.curNode, blks)
		if err != nil {
			return err
		}

203
		thisk, err = dm.dagserv.Add(nd)
204 205 206 207 208 209 210
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

211 212 213 214 215 216 217 218
	// Finalize correct pinning, and flush pinner
	dm.mp.PinWithMode(thisk, pin.Recursive)
	dm.mp.RemovePinWithMode(curk, pin.Recursive)
	err = dm.mp.Flush()
	if err != nil {
		return err
	}

219 220 221 222 223 224
	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
225
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
226 227
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
Jeromy's avatar
Jeromy committed
228
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (u.Key, bool, error) {
229 230
	f, err := ft.FromBytes(node.Data)
	if err != nil {
Jeromy's avatar
Jeromy committed
231
		return "", false, err
232 233
	}

Jeromy's avatar
Jeromy committed
234 235
	// If we've reached a leaf node.
	if len(node.Links) == 0 {
236 237
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
238
			return "", false, err
239 240 241 242 243
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
244
			return "", false, err
245 246 247 248 249
		}

		nd := &mdag.Node{Data: b}
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
250
			return "", false, err
251 252 253 254
		}

		// Hey look! we're done!
		var done bool
255
		if n < len(f.Data[offset:]) {
256 257 258
			done = true
		}

Jeromy's avatar
Jeromy committed
259
		return k, done, nil
260 261 262 263 264
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
265
		// We found the correct child to write into
266
		if cur+bs > offset {
267 268 269 270
			// Unpin block
			ckey := u.Key(node.Links[i].Hash)
			dm.mp.RemovePinWithMode(ckey, pin.Indirect)

Jeromy's avatar
Jeromy committed
271
			child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv)
272
			if err != nil {
Jeromy's avatar
Jeromy committed
273
				return "", false, err
274
			}
Jeromy's avatar
Jeromy committed
275
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
276
			if err != nil {
Jeromy's avatar
Jeromy committed
277
				return "", false, err
278 279
			}

280 281 282
			// pin the new node
			dm.mp.PinWithMode(k, pin.Indirect)

283 284 285
			offset += bs
			node.Links[i].Hash = mh.Multihash(k)

286 287 288 289 290 291
			// Recache serialized node
			_, err = node.Encoded(true)
			if err != nil {
				return "", false, err
			}

292
			if sdone {
293
				// No more bytes to write!
294 295 296
				done = true
				break
			}
297
			offset = cur + bs
298 299 300 301 302
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
303
	return k, done, err
304 305
}

Jeromy's avatar
Jeromy committed
306
// appendData appends the blocks from the given chan to the end of this dag
307 308 309 310 311 312 313 314 315 316
func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.Node, error) {
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
		Pinner:   dm.mp,
	}

	return trickle.TrickleAppend(node, dbp.New(blks))
}

Jeromy's avatar
Jeromy committed
317
// Read data from this dag starting at the current offset
318
func (dm *DagModifier) Read(b []byte) (int, error) {
319
	err := dm.readPrep()
320 321 322 323
	if err != nil {
		return 0, err
	}

324 325 326 327 328 329 330 331 332 333 334
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

335
	if dm.read == nil {
336 337
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
338
		if err != nil {
339
			return err
340 341 342 343
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
344
			return err
345 346 347
		}

		if i != int64(dm.curWrOff) {
348
			return ErrSeekFail
349 350
		}

351
		dm.readCancel = cancel
352 353 354
		dm.read = dr
	}

355 356 357 358 359 360 361 362 363 364 365
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
366 367 368 369 370 371
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
372
	err := dm.Sync()
373 374 375 376 377 378
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
379
// HasChanges returned whether or not there are unflushed changes to this dag
380 381 382 383 384
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
385
	err := dm.Sync()
386 387 388 389 390 391 392 393 394 395 396 397
	if err != nil {
		return 0, err
	}

	switch whence {
	case os.SEEK_CUR:
		dm.curWrOff += uint64(offset)
		dm.writeStart = dm.curWrOff
	case os.SEEK_SET:
		dm.curWrOff = uint64(offset)
		dm.writeStart = uint64(offset)
	case os.SEEK_END:
Jeromy's avatar
Jeromy committed
398
		return 0, ErrSeekEndNotImpl
399
	default:
Jeromy's avatar
Jeromy committed
400
		return 0, ErrUnrecognizedWhence
401 402 403 404 405 406 407 408 409 410 411 412 413
	}

	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
414
	err := dm.Sync()
415 416 417 418 419 420 421 422 423
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
424
	// Truncate can also be used to expand the file
425
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
426
		return dm.expandSparse(int64(size) - realSize)
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
	}

	nnode, err := dagTruncate(dm.curNode, uint64(size), dm.dagserv)
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
443
// dagTruncate truncates the given node to 'size' and returns the modified Node
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
func dagTruncate(nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
	if len(nd.Links) == 0 {
		// TODO: this can likely be done without marshaling and remarshaling
		pbn, err := ft.FromBytes(nd.Data)
		if err != nil {
			return nil, err
		}

		nd.Data = ft.WrapData(pbn.Data[:size])
		return nd, nil
	}

	var cur uint64
	end := 0
	var modified *mdag.Node
	ndata := new(ft.FSNode)
	for i, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
461 462 463 464
		ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
		defer cancel()

		child, err := lnk.GetNode(ctx, ds)
465 466 467 468 469 470 471 472 473
		if err != nil {
			return nil, err
		}

		childsize, err := ft.DataSize(child.Data)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
474
		// found the child we want to cut
475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
		if size < cur+childsize {
			nchild, err := dagTruncate(child, size-cur, ds)
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

	nd.Links = nd.Links[:end]
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

	nd.Data = d

509 510 511 512 513 514
	// invalidate cache and recompute serialized data
	_, err = nd.Encoded(true)
	if err != nil {
		return nil, err
	}

515 516
	return nd, nil
}