Commit 9ae420ab authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

SizeSplitter fix: keep-reading until chunk full

if the underlying reader is buffered with a smaller buffer
it would force the chunk sizes to come out smaller than
intended.

cc @whyrusleeping @mappum
parent f43954fc
......@@ -23,23 +23,34 @@ func (ss *SizeSplitter) Split(r io.Reader) chan []byte {
out := make(chan []byte)
go func() {
defer close(out)
// all-chunks loop (keep creating chunks)
for {
// log.Infof("making chunk with size: %d", ss.Size)
chunk := make([]byte, ss.Size)
nread, err := r.Read(chunk)
if err != nil {
sofar := 0
// this-chunk loop (keep reading until this chunk full)
for {
nread, err := r.Read(chunk[sofar:])
sofar += nread
if err == io.EOF {
if nread > 0 {
out <- chunk[:nread]
if sofar > 0 {
// log.Infof("sending out chunk with size: %d", sofar)
out <- chunk[:sofar]
}
return
}
log.Errorf("Block split error: %s", err)
return
}
if nread < ss.Size {
chunk = chunk[:nread]
if err != nil {
log.Errorf("Block split error: %s", err)
return
}
if sofar == ss.Size {
// log.Infof("sending out chunk with size: %d", sofar)
out <- chunk[:sofar]
break // break out of this-chunk loop
}
}
out <- chunk
}
}()
return out
......
......@@ -3,6 +3,7 @@ package chunk
import (
"bytes"
"crypto/rand"
"io"
"testing"
)
......@@ -54,3 +55,51 @@ func TestSizeSplitterIsDeterministic(t *testing.T) {
test()
}
}
func TestSizeSplitterFillsChunks(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
max := 10000000
b := randBuf(t, max)
r := &clipReader{r: bytes.NewReader(b), size: 4000}
s := SizeSplitter{Size: 1024 * 256}
c := s.Split(r)
sofar := 0
whole := make([]byte, max)
for chunk := range c {
bc := b[sofar : sofar+len(chunk)]
if !bytes.Equal(bc, chunk) {
t.Fatalf("chunk not correct: (sofar: %d) %d != %d, %v != %v", sofar, len(bc), len(chunk), bc[:100], chunk[:100])
}
copy(whole[sofar:], chunk)
sofar += len(chunk)
if sofar != max && len(chunk) < s.Size {
t.Fatal("sizesplitter split at a smaller size")
}
}
if !bytes.Equal(b, whole) {
t.Fatal("splitter did not split right")
}
}
type clipReader struct {
size int
r io.Reader
}
func (s *clipReader) Read(buf []byte) (int, error) {
// clip the incoming buffer to produce smaller chunks
if len(buf) > s.size {
buf = buf[:s.size]
}
return s.r.Read(buf)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment