dagmodifier.go 10.3 KB
Newer Older
1 2 3 4 5 6 7 8
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"

9
	proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
10 11 12
	mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

13
	key "github.com/ipfs/go-ipfs/blocks/key"
14 15 16 17 18 19
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
Jeromy's avatar
Jeromy committed
20
	logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
21 22
)

Jeromy's avatar
Jeromy committed
23 24 25 26
var ErrSeekFail = errors.New("failed to seek properly")
var ErrSeekEndNotImpl = errors.New("SEEK_END currently not implemented")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

27 28 29
// 2MB
var writebufferSize = 1 << 21

Jeromy's avatar
Jeromy committed
30
var log = logging.Logger("dagio")
31 32 33 34 35 36 37 38

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
	curNode *mdag.Node

39
	splitter   chunk.SplitterGen
40 41 42 43 44 45 46 47 48 49
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

50
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
51 52 53 54 55 56 57 58 59 60 61
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
62 63
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
64 65 66 67 68 69
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
70 71 72 73 74 75 76 77 78 79 80
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

81
		err = dm.Sync()
82 83 84 85 86 87 88 89 90 91 92 93 94
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
95
	for i := range b {
96 97 98 99 100
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
101 102
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
103 104
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
105
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
106
	nnode, err := dm.appendData(dm.curNode, spl)
107 108 109 110 111 112 113 114 115 116 117
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
118
// Write continues writing to the dag at the current offset
119 120 121 122 123 124 125
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
126

127 128 129 130 131 132
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
133
		err := dm.Sync()
134 135 136 137 138 139 140 141
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
142
	pbn, err := ft.FromBytes(dm.curNode.Data)
143 144 145 146
	if err != nil {
		return 0, err
	}

147 148 149 150
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
151 152 153 154 155
	}

	return int64(pbn.GetFilesize()), nil
}

156 157
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
158
	// No buffer? Nothing to do
159 160 161 162 163 164 165 166 167 168
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
169
	// Number of bytes we're going to write
170 171
	buflen := dm.wrBuf.Len()

172
	// Grab key for unpinning after mod operation
173
	_, err := dm.curNode.Key()
174 175 176 177
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
178
	// overwrite existing dag nodes
179
	thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
180 181 182 183
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
184
	nd, err := dm.dagserv.Get(dm.ctx, thisk)
185 186 187 188 189 190
	if err != nil {
		return err
	}

	dm.curNode = nd

Jeromy's avatar
Jeromy committed
191
	// need to write past end of current dag
192
	if !done {
rht's avatar
rht committed
193
		nd, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
194 195 196 197
		if err != nil {
			return err
		}

198
		thisk, err = dm.dagserv.Add(nd)
199 200 201 202 203 204 205 206 207 208 209 210 211
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
212
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
213 214
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
215
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (key.Key, bool, error) {
216 217
	f, err := ft.FromBytes(node.Data)
	if err != nil {
Jeromy's avatar
Jeromy committed
218
		return "", false, err
219 220
	}

Jeromy's avatar
Jeromy committed
221 222
	// If we've reached a leaf node.
	if len(node.Links) == 0 {
223 224
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
225
			return "", false, err
226 227 228 229 230
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
231
			return "", false, err
232 233 234 235 236
		}

		nd := &mdag.Node{Data: b}
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
237
			return "", false, err
238 239 240 241
		}

		// Hey look! we're done!
		var done bool
242
		if n < len(f.Data[offset:]) {
243 244 245
			done = true
		}

Jeromy's avatar
Jeromy committed
246
		return k, done, nil
247 248 249 250 251
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
252
		// We found the correct child to write into
253
		if cur+bs > offset {
Jeromy's avatar
Jeromy committed
254
			child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv)
255
			if err != nil {
Jeromy's avatar
Jeromy committed
256
				return "", false, err
257
			}
Jeromy's avatar
Jeromy committed
258
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
259
			if err != nil {
Jeromy's avatar
Jeromy committed
260
				return "", false, err
261 262 263 264 265
			}

			offset += bs
			node.Links[i].Hash = mh.Multihash(k)

266 267 268 269 270 271
			// Recache serialized node
			_, err = node.Encoded(true)
			if err != nil {
				return "", false, err
			}

272
			if sdone {
273
				// No more bytes to write!
274 275 276
				done = true
				break
			}
277
			offset = cur + bs
278 279 280 281 282
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
283
	return k, done, err
284 285
}

Jeromy's avatar
Jeromy committed
286
// appendData appends the blocks from the given chan to the end of this dag
rht's avatar
rht committed
287
func (dm *DagModifier) appendData(node *mdag.Node, spl chunk.Splitter) (*mdag.Node, error) {
288 289 290 291 292
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

rht's avatar
rht committed
293
	return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
294 295
}

Jeromy's avatar
Jeromy committed
296
// Read data from this dag starting at the current offset
297
func (dm *DagModifier) Read(b []byte) (int, error) {
298
	err := dm.readPrep()
299 300 301 302
	if err != nil {
		return 0, err
	}

303 304 305 306 307 308 309 310 311 312 313
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

314
	if dm.read == nil {
315 316
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
317
		if err != nil {
318
			return err
319 320 321 322
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
323
			return err
324 325 326
		}

		if i != int64(dm.curWrOff) {
327
			return ErrSeekFail
328 329
		}

330
		dm.readCancel = cancel
331 332 333
		dm.read = dr
	}

334 335 336 337 338 339 340 341 342 343 344
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
345 346 347 348 349 350
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
351
	err := dm.Sync()
352 353 354 355 356 357
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
358
// HasChanges returned whether or not there are unflushed changes to this dag
359 360 361 362 363
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
364
	err := dm.Sync()
365 366 367 368
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
369 370 371 372 373 374
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
375 376
	switch whence {
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
377
		newoffset = dm.curWrOff + uint64(offset)
378
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
379
		newoffset = uint64(offset)
380
	case os.SEEK_END:
Jeromy's avatar
Jeromy committed
381
		return 0, ErrSeekEndNotImpl
382
	default:
Jeromy's avatar
Jeromy committed
383
		return 0, ErrUnrecognizedWhence
384 385
	}

Jeromy's avatar
Jeromy committed
386 387 388 389 390 391 392 393
	if offset > fisize {
		if err := dm.expandSparse(offset - fisize); err != nil {
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

394 395 396 397 398 399 400 401 402 403 404
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
405
	err := dm.Sync()
406 407 408 409 410 411 412 413 414
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
415
	// Truncate can also be used to expand the file
416
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
417
		return dm.expandSparse(int64(size) - realSize)
418 419
	}

420
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
421 422 423 424 425 426 427 428 429 430 431 432 433
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
434
// dagTruncate truncates the given node to 'size' and returns the modified Node
435
func dagTruncate(ctx context.Context, nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
	if len(nd.Links) == 0 {
		// TODO: this can likely be done without marshaling and remarshaling
		pbn, err := ft.FromBytes(nd.Data)
		if err != nil {
			return nil, err
		}

		nd.Data = ft.WrapData(pbn.Data[:size])
		return nd, nil
	}

	var cur uint64
	end := 0
	var modified *mdag.Node
	ndata := new(ft.FSNode)
	for i, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
452
		child, err := lnk.GetNode(ctx, ds)
453 454 455 456 457 458 459 460 461
		if err != nil {
			return nil, err
		}

		childsize, err := ft.DataSize(child.Data)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
462
		// found the child we want to cut
463
		if size < cur+childsize {
464
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

	nd.Links = nd.Links[:end]
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

	nd.Data = d

497 498 499 500 501 502
	// invalidate cache and recompute serialized data
	_, err = nd.Encoded(true)
	if err != nil {
		return nil, err
	}

503 504
	return nd, nil
}