splitting.go 2.28 KB
Newer Older
1 2 3 4
// Package chunk implements streaming block splitters.
// Splitters read data from a reader and provide byte slices (chunks)
// The size and contents of these slices depend on the splitting method
// used.
5 6 7 8 9
package chunk

import (
	"io"

10
	logging "gitlab.dms3.io/dms3/go-log"
tavit ohanian's avatar
tavit ohanian committed
11
	pool "gitlab.dms3.io/p2p/go-buffer-pool"
12 13
)

Jeromy's avatar
Jeromy committed
14
var log = logging.Logger("chunk")
15

16 17
// A Splitter reads bytes from a Reader and creates "chunks" (byte slices)
// that can be used to build DAG nodes.
18
type Splitter interface {
19
	Reader() io.Reader
20
	NextBytes() ([]byte, error)
21 22
}

23
// SplitterGen is a splitter generator, given a reader.
24 25
type SplitterGen func(r io.Reader) Splitter

26
// DefaultSplitter returns a SizeSplitter with the DefaultBlockSize.
27 28 29 30
func DefaultSplitter(r io.Reader) Splitter {
	return NewSizeSplitter(r, DefaultBlockSize)
}

31 32
// SizeSplitterGen returns a SplitterGen function which will create
// a splitter with the given size when called.
33 34 35 36
func SizeSplitterGen(size int64) SplitterGen {
	return func(r io.Reader) Splitter {
		return NewSizeSplitter(r, size)
	}
37 38
}

39 40
// Chan returns a channel that receives each of the chunks produced
// by a splitter, along with another one for errors.
41
func Chan(s Splitter) (<-chan []byte, <-chan error) {
42
	out := make(chan []byte)
43
	errs := make(chan error, 1)
44 45
	go func() {
		defer close(out)
46
		defer close(errs)
47 48

		// all-chunks loop (keep creating chunks)
49
		for {
50
			b, err := s.NextBytes()
51
			if err != nil {
52
				errs <- err
53
				return
54
			}
55 56

			out <- b
57 58
		}
	}()
59 60 61 62 63
	return out, errs
}

type sizeSplitterv2 struct {
	r    io.Reader
64
	size uint32
65 66 67
	err  error
}

68
// NewSizeSplitter returns a new size-based Splitter with the given block size.
69 70 71
func NewSizeSplitter(r io.Reader, size int64) Splitter {
	return &sizeSplitterv2{
		r:    r,
72
		size: uint32(size),
73 74 75
	}
}

76
// NextBytes produces a new chunk.
77 78 79 80
func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
	if ss.err != nil {
		return nil, ss.err
	}
81

Steven Allen's avatar
Steven Allen committed
82
	full := pool.Get(int(ss.size))
83 84 85
	n, err := io.ReadFull(ss.r, full)
	switch err {
	case io.ErrUnexpectedEOF:
86
		ss.err = io.EOF
87 88
		small := make([]byte, n)
		copy(small, full)
Steven Allen's avatar
Steven Allen committed
89
		pool.Put(full)
90 91 92 93
		return small, nil
	case nil:
		return full, nil
	default:
Steven Allen's avatar
Steven Allen committed
94
		pool.Put(full)
95 96
		return nil, err
	}
97
}
98

99
// Reader returns the io.Reader associated to this Splitter.
100 101 102
func (ss *sizeSplitterv2) Reader() io.Reader {
	return ss.r
}