dagmodifier.go 10.2 KB
Newer Older
1 2 3 4 5 6 7 8
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"

9
	mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash"
Jeromy's avatar
Jeromy committed
10
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
11
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
12

13
	key "github.com/ipfs/go-ipfs/blocks/key"
14 15 16 17 18 19
	chunk "github.com/ipfs/go-ipfs/importer/chunk"
	help "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
20
	logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
21 22
)

Jeromy's avatar
Jeromy committed
23 24 25 26
var ErrSeekFail = errors.New("failed to seek properly")
var ErrSeekEndNotImpl = errors.New("SEEK_END currently not implemented")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

27 28 29
// 2MB
var writebufferSize = 1 << 21

Jeromy's avatar
Jeromy committed
30
var log = logging.Logger("dagio")
31 32 33 34 35 36 37 38

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
	curNode *mdag.Node

39
	splitter   chunk.SplitterGen
40 41 42 43 44 45 46 47 48 49
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

50
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
51 52 53 54 55 56 57 58 59 60 61
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
62 63
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
64 65 66 67 68 69
	if offset == int64(dm.writeStart) && dm.wrBuf != nil {
		// If we would overwrite the previous write
		if len(b) >= dm.wrBuf.Len() {
			dm.wrBuf.Reset()
		}
	} else if uint64(offset) != dm.curWrOff {
70 71 72 73 74 75 76 77 78 79 80
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

81
		err = dm.Sync()
82 83 84 85 86 87 88 89 90 91 92 93 94
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
rht's avatar
rht committed
95
	for i := range b {
96 97 98 99 100
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
101 102
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
103 104
func (dm *DagModifier) expandSparse(size int64) error {
	r := io.LimitReader(zeroReader{}, size)
105
	spl := chunk.NewSizeSplitter(r, 4096)
rht's avatar
rht committed
106
	nnode, err := dm.appendData(dm.curNode, spl)
107 108 109 110 111 112 113 114 115 116 117
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
118
// Write continues writing to the dag at the current offset
119 120 121 122 123 124 125
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
126

127 128 129 130 131 132
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
133
		err := dm.Sync()
134 135 136 137 138 139 140 141
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
142
	pbn, err := ft.FromBytes(dm.curNode.Data())
143 144 145 146
	if err != nil {
		return 0, err
	}

147 148 149 150
	if dm.wrBuf != nil {
		if uint64(dm.wrBuf.Len())+dm.writeStart > pbn.GetFilesize() {
			return int64(dm.wrBuf.Len()) + int64(dm.writeStart), nil
		}
151 152 153 154 155
	}

	return int64(pbn.GetFilesize()), nil
}

156 157
// Sync writes changes to this dag to disk
func (dm *DagModifier) Sync() error {
Jeromy's avatar
Jeromy committed
158
	// No buffer? Nothing to do
159 160 161 162 163 164 165 166 167 168
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
169
	// Number of bytes we're going to write
170 171
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
172
	// overwrite existing dag nodes
173
	thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
174 175 176 177
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
178
	nd, err := dm.dagserv.Get(dm.ctx, thisk)
179 180 181 182 183 184
	if err != nil {
		return err
	}

	dm.curNode = nd

Jeromy's avatar
Jeromy committed
185
	// need to write past end of current dag
186
	if !done {
rht's avatar
rht committed
187
		nd, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
188 189 190 191
		if err != nil {
			return err
		}

192
		thisk, err = dm.dagserv.Add(nd)
193 194 195 196 197 198 199 200 201 202 203 204 205
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
206
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
207 208
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
209
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (key.Key, bool, error) {
210
	f, err := ft.FromBytes(node.Data())
211
	if err != nil {
Jeromy's avatar
Jeromy committed
212
		return "", false, err
213 214
	}

Jeromy's avatar
Jeromy committed
215 216
	// If we've reached a leaf node.
	if len(node.Links) == 0 {
217 218
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
219
			return "", false, err
220 221 222 223 224
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
225
			return "", false, err
226 227
		}

228 229
		nd := new(mdag.Node)
		nd.SetData(b)
230 231
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
232
			return "", false, err
233 234 235 236
		}

		// Hey look! we're done!
		var done bool
237
		if n < len(f.Data[offset:]) {
238 239 240
			done = true
		}

Jeromy's avatar
Jeromy committed
241
		return k, done, nil
242 243 244 245 246
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
247
		// We found the correct child to write into
248
		if cur+bs > offset {
Jeromy's avatar
Jeromy committed
249
			child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv)
250
			if err != nil {
Jeromy's avatar
Jeromy committed
251
				return "", false, err
252
			}
Jeromy's avatar
Jeromy committed
253
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
254
			if err != nil {
Jeromy's avatar
Jeromy committed
255
				return "", false, err
256 257 258 259 260
			}

			offset += bs
			node.Links[i].Hash = mh.Multihash(k)

261
			// Recache serialized node
262
			_, err = node.EncodeProtobuf(true)
263 264 265 266
			if err != nil {
				return "", false, err
			}

267
			if sdone {
268
				// No more bytes to write!
269 270 271
				done = true
				break
			}
272
			offset = cur + bs
273 274 275 276 277
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
278
	return k, done, err
279 280
}

Jeromy's avatar
Jeromy committed
281
// appendData appends the blocks from the given chan to the end of this dag
rht's avatar
rht committed
282
func (dm *DagModifier) appendData(node *mdag.Node, spl chunk.Splitter) (*mdag.Node, error) {
283 284 285 286 287
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
	}

rht's avatar
rht committed
288
	return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
289 290
}

Jeromy's avatar
Jeromy committed
291
// Read data from this dag starting at the current offset
292
func (dm *DagModifier) Read(b []byte) (int, error) {
293
	err := dm.readPrep()
294 295 296 297
	if err != nil {
		return 0, err
	}

298 299 300 301 302 303 304 305 306 307 308
	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

func (dm *DagModifier) readPrep() error {
	err := dm.Sync()
	if err != nil {
		return err
	}

309
	if dm.read == nil {
310 311
		ctx, cancel := context.WithCancel(dm.ctx)
		dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
312
		if err != nil {
313
			return err
314 315 316 317
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
318
			return err
319 320 321
		}

		if i != int64(dm.curWrOff) {
322
			return ErrSeekFail
323 324
		}

325
		dm.readCancel = cancel
326 327 328
		dm.read = dr
	}

329 330 331 332 333 334 335 336 337 338 339
	return nil
}

// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
	err := dm.readPrep()
	if err != nil {
		return 0, err
	}

	n, err := dm.read.CtxReadFull(ctx, b)
340 341 342 343 344 345
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
346
	err := dm.Sync()
347 348 349 350 351 352
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
353
// HasChanges returned whether or not there are unflushed changes to this dag
354 355 356 357 358
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
359
	err := dm.Sync()
360 361 362 363
	if err != nil {
		return 0, err
	}

Jeromy's avatar
Jeromy committed
364 365 366 367 368 369
	fisize, err := dm.Size()
	if err != nil {
		return 0, err
	}

	var newoffset uint64
370 371
	switch whence {
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
372
		newoffset = dm.curWrOff + uint64(offset)
373
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
374
		newoffset = uint64(offset)
375
	case os.SEEK_END:
376
		newoffset = uint64(fisize) - uint64(offset)
377
	default:
Jeromy's avatar
Jeromy committed
378
		return 0, ErrUnrecognizedWhence
379 380
	}

Jeromy's avatar
Jeromy committed
381 382 383 384 385 386 387 388
	if offset > fisize {
		if err := dm.expandSparse(offset - fisize); err != nil {
			return 0, err
		}
	}
	dm.curWrOff = newoffset
	dm.writeStart = newoffset

389 390 391 392 393 394 395 396 397 398 399
	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
400
	err := dm.Sync()
401 402 403 404 405 406 407 408 409
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
410
	// Truncate can also be used to expand the file
411
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
412
		return dm.expandSparse(int64(size) - realSize)
413 414
	}

415
	nnode, err := dagTruncate(dm.ctx, dm.curNode, uint64(size), dm.dagserv)
416 417 418 419 420 421 422 423 424 425 426 427 428
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
429
// dagTruncate truncates the given node to 'size' and returns the modified Node
430
func dagTruncate(ctx context.Context, nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
431 432
	if len(nd.Links) == 0 {
		// TODO: this can likely be done without marshaling and remarshaling
433
		pbn, err := ft.FromBytes(nd.Data())
434 435 436 437
		if err != nil {
			return nil, err
		}

438
		nd.SetData(ft.WrapData(pbn.Data[:size]))
439 440 441 442 443 444 445 446
		return nd, nil
	}

	var cur uint64
	end := 0
	var modified *mdag.Node
	ndata := new(ft.FSNode)
	for i, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
447
		child, err := lnk.GetNode(ctx, ds)
448 449 450 451
		if err != nil {
			return nil, err
		}

452
		childsize, err := ft.DataSize(child.Data())
453 454 455 456
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
457
		// found the child we want to cut
458
		if size < cur+childsize {
459
			nchild, err := dagTruncate(ctx, child, size-cur, ds)
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

	nd.Links = nd.Links[:end]
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

490
	nd.SetData(d)
491

492
	// invalidate cache and recompute serialized data
493
	_, err = nd.EncodeProtobuf(true)
494 495 496 497
	if err != nil {
		return nil, err
	}

498 499
	return nd, nil
}