splitting.go 2.46 KB
Newer Older
1 2 3 4
// Package chunk implements streaming block splitters.
// Splitters read data from a reader and provide byte slices (chunks)
// The size and contents of these slices depend on the splitting method
// used.
5 6 7 8 9
package chunk

import (
	"io"

10 11
	logging "github.com/ipfs/go-log"
	mpool "github.com/libp2p/go-msgio/mpool"
12 13
)

Jeromy's avatar
Jeromy committed
14
var log = logging.Logger("chunk")
15

16
// DefaultBlockSize is the chunk size that splitters produce (or aim to).
17
var DefaultBlockSize int64 = 1024 * 256
18

19 20
// A Splitter reads bytes from a Reader and creates "chunks" (byte slices)
// that can be used to build DAG nodes.
21
type Splitter interface {
22
	Reader() io.Reader
23
	NextBytes() ([]byte, error)
24 25
}

26
// SplitterGen is a splitter generator, given a reader.
27 28
type SplitterGen func(r io.Reader) Splitter

29
// DefaultSplitter returns a SizeSplitter with the DefaultBlockSize.
30 31 32 33
func DefaultSplitter(r io.Reader) Splitter {
	return NewSizeSplitter(r, DefaultBlockSize)
}

34 35
// SizeSplitterGen returns a SplitterGen function which will create
// a splitter with the given size when called.
36 37 38 39
func SizeSplitterGen(size int64) SplitterGen {
	return func(r io.Reader) Splitter {
		return NewSizeSplitter(r, size)
	}
40 41
}

42 43
// Chan returns a channel that receives each of the chunks produced
// by a splitter, along with another one for errors.
44
func Chan(s Splitter) (<-chan []byte, <-chan error) {
45
	out := make(chan []byte)
46
	errs := make(chan error, 1)
47 48
	go func() {
		defer close(out)
49
		defer close(errs)
50 51

		// all-chunks loop (keep creating chunks)
52
		for {
53
			b, err := s.NextBytes()
54
			if err != nil {
55
				errs <- err
56
				return
57
			}
58 59

			out <- b
60 61
		}
	}()
62 63 64 65 66
	return out, errs
}

type sizeSplitterv2 struct {
	r    io.Reader
67
	size uint32
68 69 70
	err  error
}

71
// NewSizeSplitter returns a new size-based Splitter with the given block size.
72 73 74
func NewSizeSplitter(r io.Reader, size int64) Splitter {
	return &sizeSplitterv2{
		r:    r,
75
		size: uint32(size),
76 77 78
	}
}

79
// NextBytes produces a new chunk.
80 81 82 83
func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
	if ss.err != nil {
		return nil, ss.err
	}
84 85 86 87 88

	full := mpool.ByteSlicePool.Get(ss.size).([]byte)[:ss.size]
	n, err := io.ReadFull(ss.r, full)
	switch err {
	case io.ErrUnexpectedEOF:
89
		ss.err = io.EOF
90 91 92 93 94 95 96 97
		small := make([]byte, n)
		copy(small, full)
		mpool.ByteSlicePool.Put(ss.size, full)
		return small, nil
	case nil:
		return full, nil
	default:
		mpool.ByteSlicePool.Put(ss.size, full)
98 99
		return nil, err
	}
100
}
101

102
// Reader returns the io.Reader associated to this Splitter.
103 104 105
func (ss *sizeSplitterv2) Reader() io.Reader {
	return ss.r
}