dagmodifier.go 10.7 KB
Newer Older
1 2 3 4
package mod

import (
	"bytes"
5
	"context"
6 7 8 9
	"errors"
	"io"
	"os"

10 11 12 13 14 15
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
16

Jeromy's avatar
Jeromy committed
17
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
18
	node "gx/ipfs/QmUsVJ7AEnGyjX8YWnrwq9vmECVGwBQNAKPpgz5KSg8dcq/go-ipld-node"
19
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
20
	cid "gx/ipfs/QmcEcrBAMrwMyhSjXt4yfyPpzgSuV8HLHavnfmiKCSRqZU/go-cid"
21 22
)

Jeromy's avatar
Jeromy committed
23 24 25
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

26 27 28
// 2MB
var writebufferSize = 1 << 21

Jeromy's avatar
Jeromy committed
29
var log = logging.Logger("dagio")
30 31 32 33 34 35

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
36
	curNode *mdag.ProtoNode
37

38
	splitter   chunk.SplitterGen
39 40 41 42 43 44 45 46 47 48
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

49 50 51 52 53 54
func NewDagModifier(ctx context.Context, from node.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
	pbn, ok := from.(*mdag.ProtoNode)
	if !ok {
		return nil, mdag.ErrNotProtobuf
	}

55
	return &DagModifier{
56
		curNode:  pbn.Copy().(*mdag.ProtoNode),
57 58 59 60 61 62 63 64 65
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
66 67
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
68 69 70 71 72 73
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
74 75 76 77 78 79 80 81 82 83 84
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

85
		err = dm.Sync()
86 87 88 89 90 91 92 93 94 95 96 97 98
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
99
	for i := range b {
100 101 102 103 104
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
105 106
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
107 108
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
109
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
110
	nnode, err := dm.appendData(dm.curNode, spl)
111 112 113 114 115 116 117
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
118 119 120 121 122 123 124

	pbnnode, ok := nnode.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnnode
125 126 127
	return nil
}

Jeromy's avatar
Jeromy committed
128
// Write continues writing to the dag at the current offset
129 130 131 132 133 134 135
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
136

137 138 139 140 141 142
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
143
		err := dm.Sync()
144 145 146 147 148 149 150 151
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
152
	pbn, err := ft.FromBytes(dm.curNode.Data())
153 154 155 156
	if err != nil {
		return 0, err
	}

157 158 159 160
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
161 162 163 164 165
	}

	return int64(pbn.GetFilesize()), nil
}

166 167
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
168
	// No buffer? Nothing to do
169 170 171 172 173 174 175 176 177 178
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
179
	// Number of bytes we're going to write
180 181
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
182
	// overwrite existing dag nodes
Jeromy's avatar
Jeromy committed
183
	thisc, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
184 185 186 187
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
188
	nd, err := dm.dagserv.Get(dm.ctx, thisc)
189 190 191 192
	if err != nil {
		return err
	}

193 194 195 196 197 198
	pbnd, ok := nd.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	dm.curNode = pbnd
199

Jeromy's avatar
Jeromy committed
200
	// need to write past end of current dag
201
	if !done {
202
		nd, err := dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
203 204 205 206
		if err != nil {
			return err
		}

Jeromy's avatar
Jeromy committed
207
		_, err = dm.dagserv.Add(nd)
208 209 210 211
		if err != nil {
			return err
		}

212 213 214 215 216 217
		pbnode, ok := nd.(*mdag.ProtoNode)
		if !ok {
			return mdag.ErrNotProtobuf
		}

		dm.curNode = pbnode
218 219 220 221 222 223 224 225
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
226
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
227 228
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
229
func (dm *DagModifier) modifyDag(node *mdag.ProtoNode, offset uint64, data io.Reader) (*cid.Cid, bool, error) {
230
	f, err := ft.FromBytes(node.Data())
231
	if err != nil {
Jeromy's avatar
Jeromy committed
232
		return nil, false, err
233 234
	}

Jeromy's avatar
Jeromy committed
235
	// If we've reached a leaf node.
236
	if len(node.Links()) == 0 {
237 238
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
239
			return nil, false, err
240 241 242 243 244
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
245
			return nil, false, err
246 247
		}

248
		nd := new(mdag.ProtoNode)
249
		nd.SetData(b)
250 251
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
252
			return nil, false, err
253 254 255 256
		}

		// Hey look! we're done!
		var done bool
257
		if n < len(f.Data[offset:]) {
258 259 260
			done = true
		}

Jeromy's avatar
Jeromy committed
261
		return k, done, nil
262 263 264 265 266
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
267
		// We found the correct child to write into
268
		if cur+bs > offset {
269
			child, err := node.Links()[i].GetNode(dm.ctx, dm.dagserv)
270
			if err != nil {
Jeromy's avatar
Jeromy committed
271
				return nil, false, err
272
			}
273 274 275 276 277 278 279

			childpb, ok := child.(*mdag.ProtoNode)
			if !ok {
				return nil, false, mdag.ErrNotProtobuf
			}

			k, sdone, err := dm.modifyDag(childpb, offset-cur, data)
280
			if err != nil {
Jeromy's avatar
Jeromy committed
281
				return nil, false, err
282 283 284
			}

			offset += bs
285
			node.Links()[i].Cid = k
286

287
			// Recache serialized node
288
			_, err = node.EncodeProtobuf(true)
289
			if err != nil {
Jeromy's avatar
Jeromy committed
290
				return nil, false, err
291 292
			}

293
			if sdone {
294
				// No more bytes to write!
295 296 297
				done = true
				break
			}
298
			offset = cur + bs
299 300 301 302 303
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
304
	return k, done, err
305 306
}

Jeromy's avatar
Jeromy committed
307
// appendData appends the blocks from the given chan to the end of this dag
308
func (dm *DagModifier) appendData(node *mdag.ProtoNode, spl chunk.Splitter) (node.Node, error) {
309 310 311 312 313
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

rht's avatar
rht committed
314
	return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
315 316
}

Jeromy's avatar
Jeromy committed
317
// Read data from this dag starting at the current offset
318
func (dm *DagModifier) Read(b []byte) (int, error) {
319
	err := dm.readPrep()
320 321 322 323
	if err != nil {
		return 0, err
	}

324 325 326 327 328 329 330 331 332 333 334
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

335
	if dm.read == nil {
336 337
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
338
		if err != nil {
339
			return err
340 341 342 343
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
344
			return err
345 346 347
		}

		if i != int64(dm.curWrOff) {
348
			return ErrSeekFail
349 350
		}

351
		dm.readCancel = cancel
352 353 354
		dm.read = dr
	}

355 356 357 358 359 360 361 362 363 364 365
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
366 367 368 369 370
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
371
func (dm *DagModifier) GetNode() (*mdag.ProtoNode, error) {
372
	err := dm.Sync()
373 374 375
	if err != nil {
		return nil, err
	}
376
	return dm.curNode.Copy().(*mdag.ProtoNode), nil
377 378
}

Jeromy's avatar
Jeromy committed
379
// HasChanges returned whether or not there are unflushed changes to this dag
380 381 382 383 384
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
385
	err := dm.Sync()
386 387 388 389
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
390 391 392 393 394 395
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
396 397
	switch whence {
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
398
		newoffset = dm.curWrOff + uint64(offset)
399
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
400
		newoffset = uint64(offset)
401
	case os.SEEK_END:
402
		newoffset = uint64(fisize) - uint64(offset)
403
	default:
Jeromy's avatar
Jeromy committed
404
		return 0, ErrUnrecognizedWhence
405 406
	}

407 408
	if int64(newoffset) > fisize {
		if err := dm.expandSparse(int64(newoffset) - fisize); err != nil {
Jeromy's avatar
Jeromy committed
409 410 411 412 413 414
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

415 416 417 418 419 420 421 422 423 424 425
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
426
	err := dm.Sync()
427 428 429 430 431 432 433 434 435
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
436
	// Truncate can also be used to expand the file
437
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
438
		return dm.expandSparse(int64(size) - realSize)
439 440
	}

441
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
442 443 444 445 446 447 448 449 450 451 452 453 454
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
455
// dagTruncate truncates the given node to 'size' and returns the modified Node
456 457
func dagTruncate(ctx context.Context, nd *mdag.ProtoNode, size uint64, ds mdag.DAGService) (*mdag.ProtoNode, error) {
	if len(nd.Links()) == 0 {
458
		// TODO: this can likely be done without marshaling and remarshaling
459
		pbn, err := ft.FromBytes(nd.Data())
460 461 462 463
		if err != nil {
			return nil, err
		}

464
		nd.SetData(ft.WrapData(pbn.Data[:size]))
465 466 467 468 469
		return nd, nil
	}

	var cur uint64
	end := 0
470
	var modified *mdag.ProtoNode
471
	ndata := new(ft.FSNode)
472
	for i, lnk := range nd.Links() {
Jeromy's avatar
Jeromy committed
473
		child, err := lnk.GetNode(ctx, ds)
474 475 476 477
		if err != nil {
			return nil, err
		}

478 479 480 481 482 483
		childpb, ok := child.(*mdag.ProtoNode)
		if !ok {
			return nil, err
		}

		childsize, err := ft.DataSize(childpb.Data())
484 485 486 487
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
488
		// found the child we want to cut
489
		if size < cur+childsize {
490
			nchild, err := dagTruncate(ctx, childpb, size-cur, ds)
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

510
	nd.SetLinks(nd.Links()[:end])
511 512 513 514 515 516 517 518 519 520
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

521
	nd.SetData(d)
522

523
	// invalidate cache and recompute serialized data
524
	_, err = nd.EncodeProtobuf(true)
525 526 527 528
	if err != nil {
		return nil, err
	}

529 530
	return nd, nil
}