splitting.go 1.68 KB
Newer Older
1
// package chunk implements streaming block splitters
2 3 4 5 6
package chunk

import (
	"io"

Jeromy's avatar
Jeromy committed
7
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
8
	mpool "gx/ipfs/QmWBug6eBS7AxRdCDVuSY5CnSit7cS2XnPFYJWqWDumhCG/go-msgio/mpool"
9 10
)

Jeromy's avatar
Jeromy committed
11
var log = logging.Logger("chunk")
12

13
var DefaultBlockSize int64 = 1024 * 256
14

15
type Splitter interface {
16
	Reader() io.Reader
17
	NextBytes() ([]byte, error)
18 19
}

20 21 22 23 24 25 26 27 28 29
type SplitterGen func(r io.Reader) Splitter

func DefaultSplitter(r io.Reader) Splitter {
	return NewSizeSplitter(r, DefaultBlockSize)
}

func SizeSplitterGen(size int64) SplitterGen {
	return func(r io.Reader) Splitter {
		return NewSizeSplitter(r, size)
	}
30 31
}

32
func Chan(s Splitter) (<-chan []byte, <-chan error) {
33
	out := make(chan []byte)
34
	errs := make(chan error, 1)
35 36
	go func() {
		defer close(out)
37
		defer close(errs)
38 39

		// all-chunks loop (keep creating chunks)
40
		for {
41
			b, err := s.NextBytes()
42
			if err != nil {
43
				errs <- err
44
				return
45
			}
46 47

			out <- b
48 49
		}
	}()
50 51 52 53 54
	return out, errs
}

type sizeSplitterv2 struct {
	r    io.Reader
55
	size uint32
56 57 58 59 60 61
	err  error
}

func NewSizeSplitter(r io.Reader, size int64) Splitter {
	return &sizeSplitterv2{
		r:    r,
62
		size: uint32(size),
63 64 65 66 67 68 69
	}
}

func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
	if ss.err != nil {
		return nil, ss.err
	}
70 71 72 73 74

	full := mpool.ByteSlicePool.Get(ss.size).([]byte)[:ss.size]
	n, err := io.ReadFull(ss.r, full)
	switch err {
	case io.ErrUnexpectedEOF:
75
		ss.err = io.EOF
76 77 78 79 80 81 82 83
		small := make([]byte, n)
		copy(small, full)
		mpool.ByteSlicePool.Put(ss.size, full)
		return small, nil
	case nil:
		return full, nil
	default:
		mpool.ByteSlicePool.Put(ss.size, full)
84 85
		return nil, err
	}
86
}
87 88 89 90

func (ss *sizeSplitterv2) Reader() io.Reader {
	return ss.r
}