dagmodifier.go 10 KB
Newer Older
1 2 3 4 5 6 7 8
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"

9 10 11 12 13 14
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
15

16
	context "context"
Jeromy's avatar
Jeromy committed
17
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
18
	cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
19
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
20 21
)

Jeromy's avatar
Jeromy committed
22 23 24
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

25 26 27
// 2MB
var writebufferSize = 1 << 21

Jeromy's avatar
Jeromy committed
28
var log = logging.Logger("dagio")
29 30 31 32 33 34 35 36

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
	curNode *mdag.Node

37
	splitter   chunk.SplitterGen
38 39 40 41 42 43 44 45 46 47
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

48
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
49 50 51 52 53 54 55 56 57 58 59
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
60 61
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
62 63 64 65 66 67
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
68 69 70 71 72 73 74 75 76 77 78
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

79
		err = dm.Sync()
80 81 82 83 84 85 86 87 88 89 90 91 92
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
93
	for i := range b {
94 95 96 97 98
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
99 100
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
101 102
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
103
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
104
	nnode, err := dm.appendData(dm.curNode, spl)
105 106 107 108 109 110 111 112 113 114 115
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
116
// Write continues writing to the dag at the current offset
117 118 119 120 121 122 123
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
124

125 126 127 128 129 130
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
131
		err := dm.Sync()
132 133 134 135 136 137 138 139
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
140
	pbn, err := ft.FromBytes(dm.curNode.Data())
141 142 143 144
	if err != nil {
		return 0, err
	}

145 146 147 148
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
149 150 151 152 153
	}

	return int64(pbn.GetFilesize()), nil
}

154 155
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
156
	// No buffer? Nothing to do
157 158 159 160 161 162 163 164 165 166
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
167
	// Number of bytes we're going to write
168 169
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
170
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
171
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
172 173 174 175
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
176
	nd, err := dm.dagserv.Get(dm.ctx, thisc)
177 178 179 180 181 182
	if err != nil {
		return err
	}

	dm.curNode = nd

Jeromy's avatar
Jeromy committed
183
	// need to write past end of current dag
184
	if !done {
rht's avatar
rht committed
185
		nd, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
186 187 188 189
		if err != nil {
			return err
		}

Jeromy's avatar
Jeromy committed
190
		_, err = dm.dagserv.Add(nd)
191 192 193 194 195 196 197 198 199 200 201 202 203
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
204
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
205 206
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
Jeromy's avatar
Jeromy committed
207
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
208
	f, err := ft.FromBytes(node.Data())
209
	if err != nil {
Jeromy's avatar
Jeromy committed
210
		return nil, false, err
211 212
	}

Jeromy's avatar
Jeromy committed
213 214
	// If we've reached a leaf node.
	if len(node.Links) == 0 {
215 216
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
217
			return nil, false, err
218 219 220 221 222
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
223
			return nil, false, err
224 225
		}

226 227
		nd := new(mdag.Node)
		nd.SetData(b)
228 229
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
230
			return nil, false, err
231 232 233 234
		}

		// Hey look! we're done!
		var done bool
235
		if n < len(f.Data[offset:]) {
236 237 238
			done = true
		}

Jeromy's avatar
Jeromy committed
239
		return k, done, nil
240 241 242 243 244
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
245
		// We found the correct child to write into
246
		if cur+bs > offset {
Jeromy's avatar
Jeromy committed
247
			child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv)
248
			if err != nil {
Jeromy's avatar
Jeromy committed
249
				return nil, false, err
250
			}
Jeromy's avatar
Jeromy committed
251
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
252
			if err != nil {
Jeromy's avatar
Jeromy committed
253
				return nil, false, err
254 255 256
			}

			offset += bs
Jeromy's avatar
Jeromy committed
257
			node.Links[i].Hash = k.Hash()
258

259
			// Recache serialized node
260
			_, err = node.EncodeProtobuf(true)
261
			if err != nil {
Jeromy's avatar
Jeromy committed
262
				return nil, false, err
263 264
			}

265
			if sdone {
266
				// No more bytes to write!
267 268 269
				done = true
				break
			}
270
			offset = cur + bs
271 272 273 274 275
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
276
	return k, done, err
277 278
}

Jeromy's avatar
Jeromy committed
279
// appendData appends the blocks from the given chan to the end of this dag
rht's avatar
rht committed
280
func (dm *DagModifier) appendData(node *mdag.Node, spl chunk.Splitter) (*mdag.Node, error) {
281 282 283 284 285
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

rht's avatar
rht committed
286
	return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
287 288
}

Jeromy's avatar
Jeromy committed
289
// Read data from this dag starting at the current offset
290
func (dm *DagModifier) Read(b []byte) (int, error) {
291
	err := dm.readPrep()
292 293 294 295
	if err != nil {
		return 0, err
	}

296 297 298 299 300 301 302 303 304 305 306
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

307
	if dm.read == nil {
308 309
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
310
		if err != nil {
311
			return err
312 313 314 315
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
316
			return err
317 318 319
		}

		if i != int64(dm.curWrOff) {
320
			return ErrSeekFail
321 322
		}

323
		dm.readCancel = cancel
324 325 326
		dm.read = dr
	}

327 328 329 330 331 332 333 334 335 336 337
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
338 339 340 341 342 343
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
344
	err := dm.Sync()
345 346 347 348 349 350
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
351
// HasChanges returned whether or not there are unflushed changes to this dag
352 353 354 355 356
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
357
	err := dm.Sync()
358 359 360 361
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
362 363 364 365 366 367
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
368 369
	switch whence {
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
370
		newoffset = dm.curWrOff + uint64(offset)
371
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
372
		newoffset = uint64(offset)
373
	case os.SEEK_END:
374
		newoffset = uint64(fisize) - uint64(offset)
375
	default:
Jeromy's avatar
Jeromy committed
376
		return 0, ErrUnrecognizedWhence
377 378
	}

379 380
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
381 382 383 384 385 386
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

387 388 389 390 391 392 393 394 395 396 397
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
398
	err := dm.Sync()
399 400 401 402 403 404 405 406 407
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
408
	// Truncate can also be used to expand the file
409
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
410
		return dm.expandSparse(int64(size) - realSize)
411 412
	}

413
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
414 415 416 417 418 419 420 421 422 423 424 425 426
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
427
// dagTruncate truncates the given node to 'size' and returns the modified Node
428
func dagTruncate(ctx context.Context, nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
429 430
	if len(nd.Links) == 0 {
		// TODO: this can likely be done without marshaling and remarshaling
431
		pbn, err := ft.FromBytes(nd.Data())
432 433 434 435
		if err != nil {
			return nil, err
		}

436
		nd.SetData(ft.WrapData(pbn.Data[:size]))
437 438 439 440 441 442 443 444
		return nd, nil
	}

	var cur uint64
	end := 0
	var modified *mdag.Node
	ndata := new(ft.FSNode)
	for i, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
445
		child, err := lnk.GetNode(ctx, ds)
446 447 448 449
		if err != nil {
			return nil, err
		}

450
		childsize, err := ft.DataSize(child.Data())
451 452 453 454
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
455
		// found the child we want to cut
456
		if size < cur+childsize {
457
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

	nd.Links = nd.Links[:end]
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

488
	nd.SetData(d)
489

490
	// invalidate cache and recompute serialized data
491
	_, err = nd.EncodeProtobuf(true)
492 493 494 495
	if err != nil {
		return nil, err
	}

496 497
	return nd, nil
}