dagmodifier.go 10.4 KB
Newer Older
1 2 3 4 5 6 7 8
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"

9 10 11 12 13 14
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
15

16
	context "context"
Jeromy's avatar
Jeromy committed
17
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
18
	cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
19
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
20 21
)

Jeromy's avatar
Jeromy committed
22 23 24
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

25 26 27
// 2MB
var writebufferSize = 1 << 21

Jeromy's avatar
Jeromy committed
28
var log = logging.Logger("dagio")
29 30 31 32 33 34

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
35
	curNode *mdag.ProtoNode
36

37
	splitter   chunk.SplitterGen
38 39 40 41 42 43 44 45 46 47
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

48
func NewDagModifier(ctx context.Context, from *mdag.ProtoNode, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
49 50 51 52 53 54 55 56 57 58 59
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
60 61
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
62 63 64 65 66 67
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
68 69 70 71 72 73 74 75 76 77 78
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

79
		err = dm.Sync()
80 81 82 83 84 85 86 87 88 89 90 91 92
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
93
	for i := range b {
94 95 96 97 98
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
99 100
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
101 102
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
103
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
104
	nnode, err := dm.appendData(dm.curNode, spl)
105 106 107 108 109 110 111 112 113 114 115
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
116
// Write continues writing to the dag at the current offset
117 118 119 120 121 122 123
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
124

125 126 127 128 129 130
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
131
		err := dm.Sync()
132 133 134 135 136 137 138 139
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
140
	pbn, err := ft.FromBytes(dm.curNode.Data())
141 142 143 144
	if err != nil {
		return 0, err
	}

145 146 147 148
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
149 150 151 152 153
	}

	return int64(pbn.GetFilesize()), nil
}

154 155
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
156
	// No buffer? Nothing to do
157 158 159 160 161 162 163 164 165 166
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
167
	// Number of bytes we're going to write
168 169
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
170
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
171
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
172 173 174 175
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
176
	nd, err := dm.dagserv.Get(dm.ctx, thisc)
177 178 179 180
	if err != nil {
		return err
	}

181 182 183 184 185 186
	pbnd, ok := nd.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnd
187

Jeromy's avatar
Jeromy committed
188
	// need to write past end of current dag
189
	if !done {
190
		nd, err := dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
191 192 193 194
		if err != nil {
			return err
		}

Jeromy's avatar
Jeromy committed
195
		_, err = dm.dagserv.Add(nd)
196 197 198 199 200 201 202 203 204 205 206 207 208
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
209
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
210 211
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
212
func (dm *DagModifier) modifyDag(node *mdag.ProtoNode, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
213
	f, err := ft.FromBytes(node.Data())
214
	if err != nil {
Jeromy's avatar
Jeromy committed
215
		return nil, false, err
216 217
	}

Jeromy's avatar
Jeromy committed
218
	// If we've reached a leaf node.
219
	if len(node.Links()) == 0 {
220 221
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
222
			return nil, false, err
223 224 225 226 227
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
228
			return nil, false, err
229 230
		}

231
		nd := new(mdag.ProtoNode)
232
		nd.SetData(b)
233 234
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
235
			return nil, false, err
236 237 238 239
		}

		// Hey look! we're done!
		var done bool
240
		if n < len(f.Data[offset:]) {
241 242 243
			done = true
		}

Jeromy's avatar
Jeromy committed
244
		return k, done, nil
245 246 247 248 249
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
250
		// We found the correct child to write into
251
		if cur+bs > offset {
252
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
253
			if err != nil {
Jeromy's avatar
Jeromy committed
254
				return nil, false, err
255
			}
256 257 258 259 260 261 262

			childpb, ok := child.(*mdag.ProtoNode)
			if !ok {
				return nil, false, mdag.ErrNotProtobuf
			}

			k, sdone, err := dm.modifyDag(childpb, offset-cur, data)
263
			if err != nil {
Jeromy's avatar
Jeromy committed
264
				return nil, false, err
265 266 267
			}

			offset += bs
268
			node.Links()[i].Cid = k
269

270
			// Recache serialized node
271
			_, err = node.EncodeProtobuf(true)
272
			if err != nil {
Jeromy's avatar
Jeromy committed
273
				return nil, false, err
274 275
			}

276
			if sdone {
277
				// No more bytes to write!
278 279 280
				done = true
				break
			}
281
			offset = cur + bs
282 283 284 285 286
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
287
	return k, done, err
288 289
}

Jeromy's avatar
Jeromy committed
290
// appendData appends the blocks from the given chan to the end of this dag
291
func (dm *DagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (*mdag.ProtoNode, error) {
292 293 294 295 296
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

rht's avatar
rht committed
297
	return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
298 299
}

Jeromy's avatar
Jeromy committed
300
// Read data from this dag starting at the current offset
301
func (dm *DagModifier) Read(b []byte) (int, error) {
302
	err := dm.readPrep()
303 304 305 306
	if err != nil {
		return 0, err
	}

307 308 309 310 311 312 313 314 315 316 317
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

318
	if dm.read == nil {
319 320
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
321
		if err != nil {
322
			return err
323 324 325 326
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
327
			return err
328 329 330
		}

		if i != int64(dm.curWrOff) {
331
			return ErrSeekFail
332 333
		}

334
		dm.readCancel = cancel
335 336 337
		dm.read = dr
	}

338 339 340 341 342 343 344 345 346 347 348
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
349 350 351 352 353
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
354
func (dm *DagModifier) GetNode() (*mdag.ProtoNode, error) {
355
	err := dm.Sync()
356 357 358 359 360 361
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
362
// HasChanges returned whether or not there are unflushed changes to this dag
363 364 365 366 367
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
368
	err := dm.Sync()
369 370 371 372
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
373 374 375 376 377 378
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
379 380
	switch whence {
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
381
		newoffset = dm.curWrOff + uint64(offset)
382
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
383
		newoffset = uint64(offset)
384
	case os.SEEK_END:
385
		newoffset = uint64(fisize) - uint64(offset)
386
	default:
Jeromy's avatar
Jeromy committed
387
		return 0, ErrUnrecognizedWhence
388 389
	}

390 391
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
392 393 394 395 396 397
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

398 399 400 401 402 403 404 405 406 407 408
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
409
	err := dm.Sync()
410 411 412 413 414 415 416 417 418
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
419
	// Truncate can also be used to expand the file
420
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
421
		return dm.expandSparse(int64(size) - realSize)
422 423
	}

424
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
425 426 427 428 429 430 431 432 433 434 435 436 437
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
438
// dagTruncate truncates the given node to 'size' and returns the modified Node
439 440
func dagTruncate(ctx context.Context, nd *mdag.ProtoNode, size uint64, ds mdag.DAGService) (*mdag.ProtoNode, error) {
	if len(nd.Links()) == 0 {
441
		// TODO: this can likely be done without marshaling and remarshaling
442
		pbn, err := ft.FromBytes(nd.Data())
443 444 445 446
		if err != nil {
			return nil, err
		}

447
		nd.SetData(ft.WrapData(pbn.Data[:size]))
448 449 450 451 452
		return nd, nil
	}

	var cur uint64
	end := 0
453
	var modified *mdag.ProtoNode
454
	ndata := new(ft.FSNode)
455
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
456
		child, err := lnk.GetNode(ctx, ds)
457 458 459 460
		if err != nil {
			return nil, err
		}

461 462 463 464 465 466
		childpb, ok := child.(*mdag.ProtoNode)
		if !ok {
			return nil, err
		}

		childsize, err := ft.DataSize(childpb.Data())
467 468 469 470
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
471
		// found the child we want to cut
472
		if size < cur+childsize {
473
			nchild, err := dagTruncate(ctx, childpb, size-cur, ds)
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

493
	nd.SetLinks(nd.Links()[:end])
494 495 496 497 498 499 500 501 502 503
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

504
	nd.SetData(d)
505

506
	// invalidate cache and recompute serialized data
507
	_, err = nd.EncodeProtobuf(true)
508 509 510 511
	if err != nil {
		return nil, err
	}

512 513
	return nd, nil
}