dagmodifier.go 10.1 KB
Newer Older
1 2 3 4 5 6 7 8
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"

9
	key "github.com/ipfs/go-ipfs/blocks/key"
10 11 12 13 14 15
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
16

17
	logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
18 19 20
	mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash"
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
21 22
)

Jeromy's avatar
Jeromy committed
23 24 25
var ErrSeekFail = errors.New("failed to seek properly")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

26 27 28
// 2MB
var writebufferSize = 1 << 21

Jeromy's avatar
Jeromy committed
29
var log = logging.Logger("dagio")
30 31 32 33 34 35 36 37

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
	curNode *mdag.Node

38
	splitter   chunk.SplitterGen
39 40 41 42 43 44 45 46 47 48
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

49
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
50 51 52 53 54 55 56 57 58 59 60
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
61 62
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
63 64 65 66 67 68
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
69 70 71 72 73 74 75 76 77 78 79
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

80
		err = dm.Sync()
81 82 83 84 85 86 87 88 89 90 91 92 93
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
94
	for i := range b {
95 96 97 98 99
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
100 101
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
102 103
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
104
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
105
	nnode, err := dm.appendData(dm.curNode, spl)
106 107 108 109 110 111 112 113 114 115 116
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
117
// Write continues writing to the dag at the current offset
118 119 120 121 122 123 124
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
125

126 127 128 129 130 131
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
132
		err := dm.Sync()
133 134 135 136 137 138 139 140
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
141
	pbn, err := ft.FromBytes(dm.curNode.Data())
142 143 144 145
	if err != nil {
		return 0, err
	}

146 147 148 149
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
150 151 152 153 154
	}

	return int64(pbn.GetFilesize()), nil
}

155 156
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
157
	// No buffer? Nothing to do
158 159 160 161 162 163 164 165 166 167
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
168
	// Number of bytes we're going to write
169 170
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
171
	// overwrite existing dag nodes
172
	thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
173 174 175 176
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
177
	nd, err := dm.dagserv.Get(dm.ctx, thisk)
178 179 180 181 182 183
	if err != nil {
		return err
	}

	dm.curNode = nd

Jeromy's avatar
Jeromy committed
184
	// need to write past end of current dag
185
	if !done {
rht's avatar
rht committed
186
		nd, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
187 188 189 190
		if err != nil {
			return err
		}

191
		thisk, err = dm.dagserv.Add(nd)
192 193 194 195 196 197 198 199 200 201 202 203 204
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
205
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
206 207
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
208
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (key.Key, bool, error) {
209
	f, err := ft.FromBytes(node.Data())
210
	if err != nil {
Jeromy's avatar
Jeromy committed
211
		return "", false, err
212 213
	}

Jeromy's avatar
Jeromy committed
214 215
	// If we've reached a leaf node.
	if len(node.Links) == 0 {
216 217
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
218
			return "", false, err
219 220 221 222 223
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
224
			return "", false, err
225 226
		}

227 228
		nd := new(mdag.Node)
		nd.SetData(b)
229 230
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
231
			return "", false, err
232 233 234 235
		}

		// Hey look! we're done!
		var done bool
236
		if n < len(f.Data[offset:]) {
237 238 239
			done = true
		}

Jeromy's avatar
Jeromy committed
240
		return k, done, nil
241 242 243 244 245
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
246
		// We found the correct child to write into
247
		if cur+bs > offset {
Jeromy's avatar
Jeromy committed
248
			child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv)
249
			if err != nil {
Jeromy's avatar
Jeromy committed
250
				return "", false, err
251
			}
Jeromy's avatar
Jeromy committed
252
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
253
			if err != nil {
Jeromy's avatar
Jeromy committed
254
				return "", false, err
255 256 257 258 259
			}

			offset += bs
			node.Links[i].Hash = mh.Multihash(k)

260
			// Recache serialized node
261
			_, err = node.EncodeProtobuf(true)
262 263 264 265
			if err != nil {
				return "", false, err
			}

266
			if sdone {
267
				// No more bytes to write!
268 269 270
				done = true
				break
			}
271
			offset = cur + bs
272 273 274 275 276
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
277
	return k, done, err
278 279
}

Jeromy's avatar
Jeromy committed
280
// appendData appends the blocks from the given chan to the end of this dag
rht's avatar
rht committed
281
func (dm *DagModifier) appendData(node *mdag.Node, spl chunk.Splitter) (*mdag.Node, error) {
282 283 284 285 286
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

rht's avatar
rht committed
287
	return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
288 289
}

Jeromy's avatar
Jeromy committed
290
// Read data from this dag starting at the current offset
291
func (dm *DagModifier) Read(b []byte) (int, error) {
292
	err := dm.readPrep()
293 294 295 296
	if err != nil {
		return 0, err
	}

297 298 299 300 301 302 303 304 305 306 307
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

308
	if dm.read == nil {
309 310
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
311
		if err != nil {
312
			return err
313 314 315 316
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
317
			return err
318 319 320
		}

		if i != int64(dm.curWrOff) {
321
			return ErrSeekFail
322 323
		}

324
		dm.readCancel = cancel
325 326 327
		dm.read = dr
	}

328 329 330 331 332 333 334 335 336 337 338
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
339 340 341 342 343 344
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
345
	err := dm.Sync()
346 347 348 349 350 351
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
352
// HasChanges returned whether or not there are unflushed changes to this dag
353 354 355 356 357
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
358
	err := dm.Sync()
359 360 361 362
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
363 364 365 366 367 368
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
369 370
	switch whence {
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
371
		newoffset = dm.curWrOff + uint64(offset)
372
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
373
		newoffset = uint64(offset)
374
	case os.SEEK_END:
375
		newoffset = uint64(fisize) - uint64(offset)
376
	default:
Jeromy's avatar
Jeromy committed
377
		return 0, ErrUnrecognizedWhence
378 379
	}

Jeromy's avatar
Jeromy committed
380 381 382 383 384 385 386 387
	if offset > fisize {
		if err := dm.expandSparse(offset - fisize); err != nil {
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

388 389 390 391 392 393 394 395 396 397 398
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
399
	err := dm.Sync()
400 401 402 403 404 405 406 407 408
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
409
	// Truncate can also be used to expand the file
410
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
411
		return dm.expandSparse(int64(size) - realSize)
412 413
	}

414
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
415 416 417 418 419 420 421 422 423 424 425 426 427
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
428
// dagTruncate truncates the given node to 'size' and returns the modified Node
429
func dagTruncate(ctx context.Context, nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
430 431
	if len(nd.Links) == 0 {
		// TODO: this can likely be done without marshaling and remarshaling
432
		pbn, err := ft.FromBytes(nd.Data())
433 434 435 436
		if err != nil {
			return nil, err
		}

437
		nd.SetData(ft.WrapData(pbn.Data[:size]))
438 439 440 441 442 443 444 445
		return nd, nil
	}

	var cur uint64
	end := 0
	var modified *mdag.Node
	ndata := new(ft.FSNode)
	for i, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
446
		child, err := lnk.GetNode(ctx, ds)
447 448 449 450
		if err != nil {
			return nil, err
		}

451
		childsize, err := ft.DataSize(child.Data())
452 453 454 455
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
456
		// found the child we want to cut
457
		if size < cur+childsize {
458
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

	nd.Links = nd.Links[:end]
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

489
	nd.SetData(d)
490

491
	// invalidate cache and recompute serialized data
492
	_, err = nd.EncodeProtobuf(true)
493 494 495 496
	if err != nil {
		return nil, err
	}

497 498
	return nd, nil
}