dagmodifier.go 9.04 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
package mod

import (
	"bytes"
	"errors"
	"io"
	"os"

	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
	mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

	chunk "github.com/jbenet/go-ipfs/importer/chunk"
	help "github.com/jbenet/go-ipfs/importer/helpers"
	trickle "github.com/jbenet/go-ipfs/importer/trickle"
	mdag "github.com/jbenet/go-ipfs/merkledag"
	pin "github.com/jbenet/go-ipfs/pin"
	ft "github.com/jbenet/go-ipfs/unixfs"
	uio "github.com/jbenet/go-ipfs/unixfs/io"
	u "github.com/jbenet/go-ipfs/util"
)

Jeromy's avatar
Jeromy committed
23 24 25 26
var ErrSeekFail = errors.New("failed to seek properly")
var ErrSeekEndNotImpl = errors.New("SEEK_END currently not implemented")
var ErrUnrecognizedWhence = errors.New("unrecognized whence")

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
// 2MB
var writebufferSize = 1 << 21

var log = u.Logger("dagio")

// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
	dagserv mdag.DAGService
	curNode *mdag.Node
	mp      pin.ManualPinner

	splitter   chunk.BlockSplitter
	ctx        context.Context
	readCancel func()

	writeStart uint64
	curWrOff   uint64
	wrBuf      *bytes.Buffer

	read *uio.DagReader
}

func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*DagModifier, error) {
	return &DagModifier{
		curNode:  from.Copy(),
		dagserv:  serv,
		splitter: spl,
		ctx:      ctx,
		mp:       mp,
	}, nil
}

// WriteAt will modify a dag file in place
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
	// TODO: this is currently VERY inneficient
Jeromy's avatar
Jeromy committed
64 65
	// each write that happens at an offset other than the current one causes a
	// flush to disk, and dag rewrite
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
	if uint64(offset) != dm.curWrOff {
		size, err := dm.Size()
		if err != nil {
			return 0, err
		}
		if offset > size {
			err := dm.expandSparse(offset - size)
			if err != nil {
				return 0, err
			}
		}

		err = dm.Flush()
		if err != nil {
			return 0, err
		}
		dm.writeStart = uint64(offset)
	}

	return dm.Write(b)
}

// A reader that just returns zeros
type zeroReader struct{}

func (zr zeroReader) Read(b []byte) (int, error) {
	for i, _ := range b {
		b[i] = 0
	}
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
98 99
// expandSparse grows the file with zero blocks of 4096
// A small blocksize is chosen to aid in deduplication
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
func (dm *DagModifier) expandSparse(size int64) error {
	spl := chunk.SizeSplitter{4096}
	r := io.LimitReader(zeroReader{}, size)
	blks := spl.Split(r)
	nnode, err := dm.appendData(dm.curNode, blks)
	if err != nil {
		return err
	}
	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}
	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
116
// Write continues writing to the dag at the current offset
117 118 119 120 121 122 123
func (dm *DagModifier) Write(b []byte) (int, error) {
	if dm.read != nil {
		dm.read = nil
	}
	if dm.wrBuf == nil {
		dm.wrBuf = new(bytes.Buffer)
	}
Jeromy's avatar
Jeromy committed
124

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
	n, err := dm.wrBuf.Write(b)
	if err != nil {
		return n, err
	}
	dm.curWrOff += uint64(n)
	if dm.wrBuf.Len() > writebufferSize {
		err := dm.Flush()
		if err != nil {
			return n, err
		}
	}
	return n, nil
}

func (dm *DagModifier) Size() (int64, error) {
	// TODO: compute size without flushing, should be easy
	err := dm.Flush()
	if err != nil {
		return 0, err
	}

	pbn, err := ft.FromBytes(dm.curNode.Data)
	if err != nil {
		return 0, err
	}

	return int64(pbn.GetFilesize()), nil
}

Jeromy's avatar
Jeromy committed
154
// Flush writes changes to this dag to disk
155
func (dm *DagModifier) Flush() error {
Jeromy's avatar
Jeromy committed
156
	// No buffer? Nothing to do
157 158 159 160 161 162 163 164 165 166
	if dm.wrBuf == nil {
		return nil
	}

	// If we have an active reader, kill it
	if dm.read != nil {
		dm.read = nil
		dm.readCancel()
	}

Jeromy's avatar
Jeromy committed
167
	// Number of bytes we're going to write
168 169
	buflen := dm.wrBuf.Len()

Jeromy's avatar
Jeromy committed
170 171
	// overwrite existing dag nodes
	k, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
172 173 174 175 176 177 178 179 180 181 182
	if err != nil {
		return err
	}

	nd, err := dm.dagserv.Get(k)
	if err != nil {
		return err
	}

	dm.curNode = nd

Jeromy's avatar
Jeromy committed
183
	// need to write past end of current dag
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
	if !done {
		blks := dm.splitter.Split(dm.wrBuf)
		nd, err = dm.appendData(dm.curNode, blks)
		if err != nil {
			return err
		}

		_, err := dm.dagserv.Add(nd)
		if err != nil {
			return err
		}

		dm.curNode = nd
	}

	dm.writeStart += uint64(buflen)

	dm.wrBuf = nil
	return nil
}

Jeromy's avatar
Jeromy committed
205
// modifyDag writes the data in 'data' over the data in 'node' starting at 'offset'
Jeromy's avatar
Jeromy committed
206 207
// returns the new key of the passed in node and whether or not all the data in the reader
// has been consumed.
Jeromy's avatar
Jeromy committed
208
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (u.Key, bool, error) {
209 210
	f, err := ft.FromBytes(node.Data)
	if err != nil {
Jeromy's avatar
Jeromy committed
211
		return "", false, err
212 213
	}

Jeromy's avatar
Jeromy committed
214 215
	// If we've reached a leaf node.
	if len(node.Links) == 0 {
216 217
		n, err := data.Read(f.Data[offset:])
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
218
			return "", false, err
219 220 221 222 223
		}

		// Update newly written node..
		b, err := proto.Marshal(f)
		if err != nil {
Jeromy's avatar
Jeromy committed
224
			return "", false, err
225 226 227 228 229
		}

		nd := &mdag.Node{Data: b}
		k, err := dm.dagserv.Add(nd)
		if err != nil {
Jeromy's avatar
Jeromy committed
230
			return "", false, err
231 232 233 234 235 236 237 238
		}

		// Hey look! we're done!
		var done bool
		if n < len(f.Data) {
			done = true
		}

Jeromy's avatar
Jeromy committed
239
		return k, done, nil
240 241 242 243 244 245 246 247
	}

	var cur uint64
	var done bool
	for i, bs := range f.GetBlocksizes() {
		if cur+bs > offset {
			child, err := node.Links[i].GetNode(dm.dagserv)
			if err != nil {
Jeromy's avatar
Jeromy committed
248
				return "", false, err
249
			}
Jeromy's avatar
Jeromy committed
250
			k, sdone, err := dm.modifyDag(child, offset-cur, data)
251
			if err != nil {
Jeromy's avatar
Jeromy committed
252
				return "", false, err
253 254 255 256 257 258 259 260 261 262 263 264 265 266
			}

			offset += bs
			node.Links[i].Hash = mh.Multihash(k)

			if sdone {
				done = true
				break
			}
		}
		cur += bs
	}

	k, err := dm.dagserv.Add(node)
Jeromy's avatar
Jeromy committed
267
	return k, done, err
268 269
}

Jeromy's avatar
Jeromy committed
270
// appendData appends the blocks from the given chan to the end of this dag
271 272 273 274 275 276 277 278 279 280
func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.Node, error) {
	dbp := &help.DagBuilderParams{
		Dagserv:  dm.dagserv,
		Maxlinks: help.DefaultLinksPerBlock,
		Pinner:   dm.mp,
	}

	return trickle.TrickleAppend(node, dbp.New(blks))
}

Jeromy's avatar
Jeromy committed
281
// Read data from this dag starting at the current offset
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
func (dm *DagModifier) Read(b []byte) (int, error) {
	err := dm.Flush()
	if err != nil {
		return 0, err
	}

	if dm.read == nil {
		dr, err := uio.NewDagReader(dm.ctx, dm.curNode, dm.dagserv)
		if err != nil {
			return 0, err
		}

		i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
		if err != nil {
			return 0, err
		}

		if i != int64(dm.curWrOff) {
Jeromy's avatar
Jeromy committed
300
			return 0, ErrSeekFail
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
		}

		dm.read = dr
	}

	n, err := dm.read.Read(b)
	dm.curWrOff += uint64(n)
	return n, err
}

// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
	err := dm.Flush()
	if err != nil {
		return nil, err
	}
	return dm.curNode.Copy(), nil
}

Jeromy's avatar
Jeromy committed
320
// HasChanges returned whether or not there are unflushed changes to this dag
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
func (dm *DagModifier) HasChanges() bool {
	return dm.wrBuf != nil
}

func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
	err := dm.Flush()
	if err != nil {
		return 0, err
	}

	switch whence {
	case os.SEEK_CUR:
		dm.curWrOff += uint64(offset)
		dm.writeStart = dm.curWrOff
	case os.SEEK_SET:
		dm.curWrOff = uint64(offset)
		dm.writeStart = uint64(offset)
	case os.SEEK_END:
Jeromy's avatar
Jeromy committed
339
		return 0, ErrSeekEndNotImpl
340
	default:
Jeromy's avatar
Jeromy committed
341
		return 0, ErrUnrecognizedWhence
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
	}

	if dm.read != nil {
		_, err = dm.read.Seek(offset, whence)
		if err != nil {
			return 0, err
		}
	}

	return int64(dm.curWrOff), nil
}

func (dm *DagModifier) Truncate(size int64) error {
	err := dm.Flush()
	if err != nil {
		return err
	}

	realSize, err := dm.Size()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
365
	// Truncate can also be used to expand the file
366
	if size > int64(realSize) {
Jeromy's avatar
Jeromy committed
367
		return dm.expandSparse(int64(size) - realSize)
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
	}

	nnode, err := dagTruncate(dm.curNode, uint64(size), dm.dagserv)
	if err != nil {
		return err
	}

	_, err = dm.dagserv.Add(nnode)
	if err != nil {
		return err
	}

	dm.curNode = nnode
	return nil
}

Jeromy's avatar
Jeromy committed
384
// dagTruncate truncates the given node to 'size' and returns the modified Node
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
func dagTruncate(nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
	if len(nd.Links) == 0 {
		// TODO: this can likely be done without marshaling and remarshaling
		pbn, err := ft.FromBytes(nd.Data)
		if err != nil {
			return nil, err
		}

		nd.Data = ft.WrapData(pbn.Data[:size])
		return nd, nil
	}

	var cur uint64
	end := 0
	var modified *mdag.Node
	ndata := new(ft.FSNode)
	for i, lnk := range nd.Links {
		child, err := lnk.GetNode(ds)
		if err != nil {
			return nil, err
		}

		childsize, err := ft.DataSize(child.Data)
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
412
		// found the child we want to cut
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
		if size < cur+childsize {
			nchild, err := dagTruncate(child, size-cur, ds)
			if err != nil {
				return nil, err
			}

			ndata.AddBlockSize(size - cur)

			modified = nchild
			end = i
			break
		}
		cur += childsize
		ndata.AddBlockSize(childsize)
	}

	_, err := ds.Add(modified)
	if err != nil {
		return nil, err
	}

	nd.Links = nd.Links[:end]
	err = nd.AddNodeLinkClean("", modified)
	if err != nil {
		return nil, err
	}

	d, err := ndata.GetBytes()
	if err != nil {
		return nil, err
	}

	nd.Data = d

	return nd, nil
}