splitting.go 1.34 KB
Newer Older
1
// package chunk implements streaming block splitters
2 3 4 5 6
package chunk

import (
	"io"

Jeromy's avatar
Jeromy committed
7
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
8 9
)

Jeromy's avatar
Jeromy committed
10
var log = logging.Logger("chunk")
11

12
var DefaultBlockSize int64 = 1024 * 256
13

14 15
type Splitter interface {
	NextBytes() ([]byte, error)
16 17
}

18 19 20 21 22 23 24 25 26 27
type SplitterGen func(r io.Reader) Splitter

func DefaultSplitter(r io.Reader) Splitter {
	return NewSizeSplitter(r, DefaultBlockSize)
}

func SizeSplitterGen(size int64) SplitterGen {
	return func(r io.Reader) Splitter {
		return NewSizeSplitter(r, size)
	}
28 29
}

30
func Chan(s Splitter) (<-chan []byte, <-chan error) {
31
	out := make(chan []byte)
32
	errs := make(chan error, 1)
33 34
	go func() {
		defer close(out)
35
		defer close(errs)
36 37

		// all-chunks loop (keep creating chunks)
38
		for {
39
			b, err := s.NextBytes()
40
			if err != nil {
41
				errs <- err
42
				return
43
			}
44 45

			out <- b
46 47
		}
	}()
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
	return out, errs
}

type sizeSplitterv2 struct {
	r    io.Reader
	size int64
	err  error
}

func NewSizeSplitter(r io.Reader, size int64) Splitter {
	return &sizeSplitterv2{
		r:    r,
		size: size,
	}
}

func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
	if ss.err != nil {
		return nil, ss.err
	}
	buf := make([]byte, ss.size)
	n, err := io.ReadFull(ss.r, buf)
	if err == io.ErrUnexpectedEOF {
		ss.err = io.EOF
		err = nil
	}
	if err != nil {
		return nil, err
	}

	return buf[:n], nil
79
}