dagmodifier.go 10.2 KB
Newer Older
1 2 3 4 5 6 7 8
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"

9
	mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash"
Jeromy's avatar
Jeromy committed
10
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
11
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
12

13
	key "github.com/ipfs/go-ipfs/blocks/key"
14 15 16 17 18 19
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
20
	logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
21 22
)

Jeromy's avatar
Jeromy committed
23 24 25 26
var ErrSeekFail = errors.New("failed to seek properly")
var ErrSeekEndNotImpl = errors.New("SEEK_END currently not implemented")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

27 28 29
// 2MB
var writebufferSize = 1 << 21

Jeromy's avatar
Jeromy committed
30
var log = logging.Logger("dagio")
31 32 33 34 35 36 37 38

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
	curNode *mdag.Node

39
	splitter   chunk.SplitterGen
40 41 42 43 44 45 46 47 48 49
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

50
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
51 52 53 54 55 56 57 58 59 60 61
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
62 63
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
64 65 66 67 68 69
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
70 71 72 73 74 75 76 77 78 79 80
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

81
		err = dm.Sync()
82 83 84 85 86 87 88 89 90 91 92 93 94
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
95
	for i := range b {
96 97 98 99 100
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
101 102
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
103 104
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
105
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
106
	nnode, err := dm.appendData(dm.curNode, spl)
107 108 109 110 111 112 113 114 115 116 117
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
118
// Write continues writing to the dag at the current offset
119 120 121 122 123 124 125
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
126

127 128 129 130 131 132
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
133
		err := dm.Sync()
134 135 136 137 138 139 140 141
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
142
	pbn, err := ft.FromBytes(dm.curNode.Data)
143 144 145 146
	if err != nil {
		return 0, err
	}

147 148 149 150
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
151 152 153 154 155
	}

	return int64(pbn.GetFilesize()), nil
}

156 157
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
158
	// No buffer? Nothing to do
159 160 161 162 163 164 165 166 167 168
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
169
	// Number of bytes we're going to write
170 171
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
172
	// overwrite existing dag nodes
173
	thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
174 175 176 177
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
178
	nd, err := dm.dagserv.Get(dm.ctx, thisk)
179 180 181 182 183 184
	if err != nil {
		return err
	}

	dm.curNode = nd

Jeromy's avatar
Jeromy committed
185
	// need to write past end of current dag
186
	if !done {
rht's avatar
rht committed
187
		nd, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
188 189 190 191
		if err != nil {
			return err
		}

192
		thisk, err = dm.dagserv.Add(nd)
193 194 195 196 197 198 199 200 201 202 203 204 205
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
206
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
207 208
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
209
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (key.Key, bool, error) {
210 211
	f, err := ft.FromBytes(node.Data)
	if err != nil {
Jeromy's avatar
Jeromy committed
212
		return "", false, err
213 214
	}

Jeromy's avatar
Jeromy committed
215 216
	// If we've reached a leaf node.
	if len(node.Links) == 0 {
217 218
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
219
			return "", false, err
220 221 222 223 224
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
225
			return "", false, err
226 227 228 229 230
		}

		nd := &mdag.Node{Data: b}
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
231
			return "", false, err
232 233 234 235
		}

		// Hey look! we're done!
		var done bool
236
		if n < len(f.Data[offset:]) {
237 238 239
			done = true
		}

Jeromy's avatar
Jeromy committed
240
		return k, done, nil
241 242 243 244 245
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
246
		// We found the correct child to write into
247
		if cur+bs > offset {
Jeromy's avatar
Jeromy committed
248
			child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv)
249
			if err != nil {
Jeromy's avatar
Jeromy committed
250
				return "", false, err
251
			}
Jeromy's avatar
Jeromy committed
252
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
253
			if err != nil {
Jeromy's avatar
Jeromy committed
254
				return "", false, err
255 256 257 258 259
			}

			offset += bs
			node.Links[i].Hash = mh.Multihash(k)

260
			// Recache serialized node
261
			_, err = node.EncodeProtobuf(true)
262 263 264 265
			if err != nil {
				return "", false, err
			}

266
			if sdone {
267
				// No more bytes to write!
268 269 270
				done = true
				break
			}
271
			offset = cur + bs
272 273 274 275 276
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
277
	return k, done, err
278 279
}

Jeromy's avatar
Jeromy committed
280
// appendData appends the blocks from the given chan to the end of this dag
rht's avatar
rht committed
281
func (dm *DagModifier) appendData(node *mdag.Node, spl chunk.Splitter) (*mdag.Node, error) {
282 283 284 285 286
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

rht's avatar
rht committed
287
	return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
288 289
}

Jeromy's avatar
Jeromy committed
290
// Read data from this dag starting at the current offset
291
func (dm *DagModifier) Read(b []byte) (int, error) {
292
	err := dm.readPrep()
293 294 295 296
	if err != nil {
		return 0, err
	}

297 298 299 300 301 302 303 304 305 306 307
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

308
	if dm.read == nil {
309 310
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
311
		if err != nil {
312
			return err
313 314 315 316
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
317
			return err
318 319 320
		}

		if i != int64(dm.curWrOff) {
321
			return ErrSeekFail
322 323
		}

324
		dm.readCancel = cancel
325 326 327
		dm.read = dr
	}

328 329 330 331 332 333 334 335 336 337 338
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
339 340 341 342 343 344
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
345
	err := dm.Sync()
346 347 348 349 350 351
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
352
// HasChanges returned whether or not there are unflushed changes to this dag
353 354 355 356 357
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
358
	err := dm.Sync()
359 360 361 362
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
363 364 365 366 367 368
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
369 370
	switch whence {
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
371
		newoffset = dm.curWrOff + uint64(offset)
372
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
373
		newoffset = uint64(offset)
374
	case os.SEEK_END:
375
		newoffset = uint64(fisize) - uint64(offset)
376
	default:
Jeromy's avatar
Jeromy committed
377
		return 0, ErrUnrecognizedWhence
378 379
	}

Jeromy's avatar
Jeromy committed
380 381 382 383 384 385 386 387
	if offset > fisize {
		if err := dm.expandSparse(offset - fisize); err != nil {
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

388 389 390 391 392 393 394 395 396 397 398
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
399
	err := dm.Sync()
400 401 402 403 404 405 406 407 408
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
409
	// Truncate can also be used to expand the file
410
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
411
		return dm.expandSparse(int64(size) - realSize)
412 413
	}

414
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
415 416 417 418 419 420 421 422 423 424 425 426 427
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
428
// dagTruncate truncates the given node to 'size' and returns the modified Node
429
func dagTruncate(ctx context.Context, nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
	if len(nd.Links) == 0 {
		// TODO: this can likely be done without marshaling and remarshaling
		pbn, err := ft.FromBytes(nd.Data)
		if err != nil {
			return nil, err
		}

		nd.Data = ft.WrapData(pbn.Data[:size])
		return nd, nil
	}

	var cur uint64
	end := 0
	var modified *mdag.Node
	ndata := new(ft.FSNode)
	for i, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
446
		child, err := lnk.GetNode(ctx, ds)
447 448 449 450 451 452 453 454 455
		if err != nil {
			return nil, err
		}

		childsize, err := ft.DataSize(child.Data)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
456
		// found the child we want to cut
457
		if size < cur+childsize {
458
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

	nd.Links = nd.Links[:end]
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

	nd.Data = d

491
	// invalidate cache and recompute serialized data
492
	_, err = nd.EncodeProtobuf(true)
493 494 495 496
	if err != nil {
		return nil, err
	}

497 498
	return nd, nil
}