Commit 1ff7a984 authored by Jeromy's avatar Jeromy

use rabin fingerprinting for a chunker

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

implement rabin fingerprinting as a chunker for ipfs

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

vendor correctly

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

refactor chunking interface a little

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

work chunking interface changes up into importer

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

move chunker type parsing into its own file in chunk

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 620aaba5
package chunk
import (
"errors"
"fmt"
"io"
"strconv"
"strings"
)
func FromString(r io.Reader, chunker string) (Splitter, error) {
switch {
case chunker == "" || chunker == "default":
return NewSizeSplitter(r, DefaultBlockSize), nil
case strings.HasPrefix(chunker, "size-"):
sizeStr := strings.Split(chunker, "-")[1]
size, err := strconv.Atoi(sizeStr)
if err != nil {
return nil, err
}
return NewSizeSplitter(r, int64(size)), nil
case strings.HasPrefix(chunker, "rabin"):
return parseRabinString(r, chunker)
default:
return nil, fmt.Errorf("unrecognized chunker option: %s", chunker)
}
}
func parseRabinString(r io.Reader, chunker string) (Splitter, error) {
parts := strings.Split(chunker, "-")
switch len(parts) {
case 1:
return NewRabin(r, uint64(DefaultBlockSize)), nil
case 2:
size, err := strconv.Atoi(parts[1])
if err != nil {
return nil, err
}
return NewRabin(r, uint64(size)), nil
case 4:
sub := strings.Split(parts[1], ":")
if len(sub) > 1 && sub[0] != "min" {
return nil, errors.New("first label must be min")
}
min, err := strconv.Atoi(sub[len(sub)-1])
if err != nil {
return nil, err
}
sub = strings.Split(parts[2], ":")
if len(sub) > 1 && sub[0] != "avg" {
log.Error("sub == ", sub)
return nil, errors.New("second label must be avg")
}
avg, err := strconv.Atoi(sub[len(sub)-1])
if err != nil {
return nil, err
}
sub = strings.Split(parts[3], ":")
if len(sub) > 1 && sub[0] != "max" {
return nil, errors.New("final label must be max")
}
max, err := strconv.Atoi(sub[len(sub)-1])
if err != nil {
return nil, err
}
return NewRabinMinMax(r, uint64(min), uint64(avg), uint64(max)), nil
default:
return nil, errors.New("incorrect format (expected 'rabin' 'rabin-[avg]' or 'rabin-[min]-[avg]-[max]'")
}
}
package chunk package chunk
import ( import (
"bufio" "hash/fnv"
"bytes"
"fmt"
"io" "io"
"math"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/chunker"
) )
type MaybeRabin struct { var IpfsRabinPoly = chunker.Pol(17437180132763653)
mask int
windowSize int
MinBlockSize int
MaxBlockSize int
}
func NewMaybeRabin(avgBlkSize int) *MaybeRabin { type Rabin struct {
blkbits := uint(math.Log2(float64(avgBlkSize))) r *chunker.Chunker
rb := new(MaybeRabin)
rb.mask = (1 << blkbits) - 1
rb.windowSize = 16 // probably a good number...
rb.MinBlockSize = avgBlkSize / 2
rb.MaxBlockSize = (avgBlkSize / 2) * 3
return rb
} }
func (mr *MaybeRabin) Split(r io.Reader) chan []byte { func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin {
out := make(chan []byte, 16) min := avgBlkSize / 3
go func() { max := avgBlkSize + (avgBlkSize / 2)
inbuf := bufio.NewReader(r)
blkbuf := new(bytes.Buffer)
// some bullshit numbers i made up
a := 10 // honestly, no idea what this is
MOD := 33554383 // randomly chosen (seriously)
an := 1
rollingHash := 0
// Window is a circular buffer return NewRabinMinMax(r, avgBlkSize, min, max)
window := make([]byte, mr.windowSize) }
push := func(i int, val byte) (outval int) {
outval = int(window[i%len(window)])
window[i%len(window)] = val
return
}
// Duplicate byte slice func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
dup := func(b []byte) []byte { h := fnv.New32a()
d := make([]byte, len(b)) ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max)
copy(d, b)
return d
}
// Fill up the window return &Rabin{
i := 0 r: ch,
for ; i < mr.windowSize; i++ { }
b, err := inbuf.ReadByte() }
if err != nil {
fmt.Println(err)
return
}
blkbuf.WriteByte(b)
push(i, b)
rollingHash = (rollingHash*a + int(b)) % MOD
an = (an * a) % MOD
}
for ; true; i++ { func (r *Rabin) NextBytes() ([]byte, error) {
b, err := inbuf.ReadByte() ch, err := r.r.Next()
if err != nil { if err != nil {
break return nil, err
} }
outval := push(i, b)
blkbuf.WriteByte(b)
rollingHash = (rollingHash*a + int(b) - an*outval) % MOD
if (rollingHash&mr.mask == mr.mask && blkbuf.Len() > mr.MinBlockSize) ||
blkbuf.Len() >= mr.MaxBlockSize {
out <- dup(blkbuf.Bytes())
blkbuf.Reset()
}
// Check if there are enough remaining return ch.Data, nil
peek, err := inbuf.Peek(mr.windowSize)
if err != nil || len(peek) != mr.windowSize {
break
}
}
io.Copy(blkbuf, inbuf)
out <- blkbuf.Bytes()
close(out)
}()
return out
} }
package chunk
import (
"bytes"
"fmt"
"github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/key"
"github.com/ipfs/go-ipfs/util"
"io"
"testing"
)
func TestRabinChunking(t *testing.T) {
data := make([]byte, 1024*1024*16)
util.NewTimeSeededRand().Read(data)
r := NewRabin(bytes.NewReader(data), 1024*256)
var chunks [][]byte
for {
chunk, err := r.NextBytes()
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
chunks = append(chunks, chunk)
}
fmt.Printf("average block size: %d\n", len(data)/len(chunks))
unchunked := bytes.Join(chunks, nil)
if !bytes.Equal(unchunked, data) {
fmt.Printf("%d %d\n", len(unchunked), len(data))
t.Fatal("data was chunked incorrectly")
}
}
func chunkData(t *testing.T, data []byte) map[key.Key]*blocks.Block {
r := NewRabin(bytes.NewReader(data), 1024*256)
blkmap := make(map[key.Key]*blocks.Block)
for {
blk, err := r.NextBytes()
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
b := blocks.NewBlock(blk)
blkmap[b.Key()] = b
}
return blkmap
}
func TestRabinChunkReuse(t *testing.T) {
data := make([]byte, 1024*1024*16)
util.NewTimeSeededRand().Read(data)
ch1 := chunkData(t, data[1000:])
ch2 := chunkData(t, data)
var extra int
for k, _ := range ch2 {
_, ok := ch1[k]
if !ok {
extra++
}
}
if extra > 2 {
t.Fatal("too many spare chunks made")
}
if extra == 2 {
t.Log("why did we get two extra blocks?")
}
}
...@@ -9,39 +9,71 @@ import ( ...@@ -9,39 +9,71 @@ import (
var log = util.Logger("chunk") var log = util.Logger("chunk")
var DefaultBlockSize = 1024 * 256 var DefaultBlockSize int64 = 1024 * 256
var DefaultSplitter = &SizeSplitter{Size: DefaultBlockSize}
type BlockSplitter interface { type Splitter interface {
Split(r io.Reader) chan []byte NextBytes() ([]byte, error)
} }
type SizeSplitter struct { type SplitterGen func(r io.Reader) Splitter
Size int
func DefaultSplitter(r io.Reader) Splitter {
return NewSizeSplitter(r, DefaultBlockSize)
}
func SizeSplitterGen(size int64) SplitterGen {
return func(r io.Reader) Splitter {
return NewSizeSplitter(r, size)
}
} }
func (ss *SizeSplitter) Split(r io.Reader) chan []byte { func Chan(s Splitter) (<-chan []byte, <-chan error) {
out := make(chan []byte) out := make(chan []byte)
errs := make(chan error, 1)
go func() { go func() {
defer close(out) defer close(out)
defer close(errs)
// all-chunks loop (keep creating chunks) // all-chunks loop (keep creating chunks)
for { for {
// log.Infof("making chunk with size: %d", ss.Size) b, err := s.NextBytes()
chunk := make([]byte, ss.Size)
nread, err := io.ReadFull(r, chunk)
if nread > 0 {
// log.Infof("sending out chunk with size: %d", sofar)
out <- chunk[:nread]
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
if err != nil { if err != nil {
log.Debugf("Block split error: %s", err) errs <- err
return return
} }
out <- b
} }
}() }()
return out return out, errs
}
type sizeSplitterv2 struct {
r io.Reader
size int64
err error
}
func NewSizeSplitter(r io.Reader, size int64) Splitter {
return &sizeSplitterv2{
r: r,
size: size,
}
}
func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
if ss.err != nil {
return nil, ss.err
}
buf := make([]byte, ss.size)
n, err := io.ReadFull(ss.r, buf)
if err == io.ErrUnexpectedEOF {
ss.err = io.EOF
err = nil
}
if err != nil {
return nil, err
}
return buf[:n], nil
} }
...@@ -32,8 +32,8 @@ func TestSizeSplitterIsDeterministic(t *testing.T) { ...@@ -32,8 +32,8 @@ func TestSizeSplitterIsDeterministic(t *testing.T) {
bufA := copyBuf(bufR) bufA := copyBuf(bufR)
bufB := copyBuf(bufR) bufB := copyBuf(bufR)
chunksA := DefaultSplitter.Split(bytes.NewReader(bufA)) chunksA, _ := Chan(DefaultSplitter(bytes.NewReader(bufA)))
chunksB := DefaultSplitter.Split(bytes.NewReader(bufB)) chunksB, _ := Chan(DefaultSplitter(bytes.NewReader(bufB)))
for n := 0; ; n++ { for n := 0; ; n++ {
a, moreA := <-chunksA a, moreA := <-chunksA
...@@ -65,8 +65,8 @@ func TestSizeSplitterFillsChunks(t *testing.T) { ...@@ -65,8 +65,8 @@ func TestSizeSplitterFillsChunks(t *testing.T) {
max := 10000000 max := 10000000
b := randBuf(t, max) b := randBuf(t, max)
r := &clipReader{r: bytes.NewReader(b), size: 4000} r := &clipReader{r: bytes.NewReader(b), size: 4000}
s := SizeSplitter{Size: 1024 * 256} chunksize := int64(1024 * 256)
c := s.Split(r) c, _ := Chan(NewSizeSplitter(r, chunksize))
sofar := 0 sofar := 0
whole := make([]byte, max) whole := make([]byte, max)
...@@ -80,7 +80,7 @@ func TestSizeSplitterFillsChunks(t *testing.T) { ...@@ -80,7 +80,7 @@ func TestSizeSplitterFillsChunks(t *testing.T) {
copy(whole[sofar:], chunk) copy(whole[sofar:], chunk)
sofar += len(chunk) sofar += len(chunk)
if sofar != max && len(chunk) < s.Size { if sofar != max && len(chunk) < int(chunksize) {
t.Fatal("sizesplitter split at a smaller size") t.Fatal("sizesplitter split at a smaller size")
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment