Unverified Commit 21b0c066 authored by Jakub Sztandera's avatar Jakub Sztandera Committed by GitHub

Merge pull request #16 from ipfs/feat/buzhash

Implement buzhash
parents 9a794d09 57fa6591
package chunk
import (
"bytes"
"io"
"math/rand"
"testing"
)
type newSplitter func(io.Reader) Splitter
type bencSpec struct {
size int
name string
}
var bSizes = []bencSpec{
{1 << 10, "1K"},
{1 << 20, "1M"},
{16 << 20, "16M"},
{100 << 20, "100M"},
}
func benchmarkChunker(b *testing.B, ns newSplitter) {
for _, s := range bSizes {
s := s
b.Run(s.name, func(b *testing.B) {
benchmarkChunkerSize(b, ns, s.size)
})
}
}
func benchmarkChunkerSize(b *testing.B, ns newSplitter, size int) {
rng := rand.New(rand.NewSource(1))
data := make([]byte, size)
rng.Read(data)
b.SetBytes(int64(size))
b.ReportAllocs()
b.ResetTimer()
var res uint64
for i := 0; i < b.N; i++ {
r := ns(bytes.NewReader(data))
for {
chunk, err := r.NextBytes()
if err != nil {
if err == io.EOF {
break
}
b.Fatal(err)
}
res = res + uint64(len(chunk))
}
}
Res = Res + res
}
package chunk
import (
"io"
"math/bits"
pool "github.com/libp2p/go-buffer-pool"
)
const (
buzMin = 128 << 10
buzMax = 512 << 10
buzMask = 1<<17 - 1
)
type Buzhash struct {
r io.Reader
buf []byte
n int
err error
}
func NewBuzhash(r io.Reader) *Buzhash {
return &Buzhash{
r: r,
buf: pool.Get(buzMax),
}
}
func (b *Buzhash) Reader() io.Reader {
return b.r
}
func (b *Buzhash) NextBytes() ([]byte, error) {
if b.err != nil {
return nil, b.err
}
n, err := io.ReadFull(b.r, b.buf[b.n:])
if err != nil {
if err == io.ErrUnexpectedEOF || err == io.EOF {
if b.n+n < buzMin {
b.err = io.EOF
res := make([]byte, b.n+n)
copy(res, b.buf)
pool.Put(b.buf)
b.buf = nil
return res, nil
}
} else {
b.err = err
pool.Put(b.buf)
b.buf = nil
return nil, err
}
}
i := buzMin - 32
var state uint32 = 0
for ; i < buzMin; i++ {
state = bits.RotateLeft32(state, 1)
state = state ^ bytehash[b.buf[i]]
}
if b.n+n > len(b.buf) {
panic("this is impossible, but gives +9 to performance")
}
for ; state&buzMask != 0 && i < b.n+n; i++ {
state = bits.RotateLeft32(state, 1) ^ bytehash[b.buf[i-32]] ^ bytehash[b.buf[i]]
}
res := make([]byte, i)
copy(res, b.buf)
b.n = copy(b.buf, b.buf[i:b.n+n])
return res, nil
}
var bytehash = [256]uint32{
0x6236e7d5, 0x10279b0b, 0x72818182, 0xdc526514, 0x2fd41e3d, 0x777ef8c8,
0x83ee5285, 0x2c8f3637, 0x2f049c1a, 0x57df9791, 0x9207151f, 0x9b544818,
0x74eef658, 0x2028ca60, 0x0271d91a, 0x27ae587e, 0xecf9fa5f, 0x236e71cd,
0xf43a8a2e, 0xbb13380, 0x9e57912c, 0x89a26cdb, 0x9fcf3d71, 0xa86da6f1,
0x9c49f376, 0x346aecc7, 0xf094a9ee, 0xea99e9cb, 0xb01713c6, 0x88acffb,
0x2960a0fb, 0x344a626c, 0x7ff22a46, 0x6d7a1aa5, 0x6a714916, 0x41d454ca,
0x8325b830, 0xb65f563, 0x447fecca, 0xf9d0ea5e, 0xc1d9d3d4, 0xcb5ec574,
0x55aae902, 0x86edc0e7, 0xd3a9e33, 0xe70dc1e1, 0xe3c5f639, 0x9b43140a,
0xc6490ac5, 0x5e4030fb, 0x8e976dd5, 0xa87468ea, 0xf830ef6f, 0xcc1ed5a5,
0x611f4e78, 0xddd11905, 0xf2613904, 0x566c67b9, 0x905a5ccc, 0x7b37b3a4,
0x4b53898a, 0x6b8fd29d, 0xaad81575, 0x511be414, 0x3cfac1e7, 0x8029a179,
0xd40efeda, 0x7380e02, 0xdc9beffd, 0x2d049082, 0x99bc7831, 0xff5002a8,
0x21ce7646, 0x1cd049b, 0xf43994f, 0xc3c6c5a5, 0xbbda5f50, 0xec15ec7,
0x9adb19b6, 0xc1e80b9, 0xb9b52968, 0xae162419, 0x2542b405, 0x91a42e9d,
0x6be0f668, 0x6ed7a6b9, 0xbc2777b4, 0xe162ce56, 0x4266aad5, 0x60fdb704,
0x66f832a5, 0x9595f6ca, 0xfee83ced, 0x55228d99, 0x12bf0e28, 0x66896459,
0x789afda, 0x282baa8, 0x2367a343, 0x591491b0, 0x2ff1a4b1, 0x410739b6,
0x9b7055a0, 0x2e0eb229, 0x24fc8252, 0x3327d3df, 0xb0782669, 0x1c62e069,
0x7f503101, 0xf50593ae, 0xd9eb275d, 0xe00eb678, 0x5917ccde, 0x97b9660a,
0xdd06202d, 0xed229e22, 0xa9c735bf, 0xd6316fe6, 0x6fc72e4c, 0x206dfa2,
0xd6b15c5a, 0x69d87b49, 0x9c97745, 0x13445d61, 0x35a975aa, 0x859aa9b9,
0x65380013, 0xd1fb6391, 0xc29255fd, 0x784a3b91, 0xb9e74c26, 0x63ce4d40,
0xc07cbe9e, 0xe6e4529e, 0xfb3632f, 0x9438d9c9, 0x682f94a8, 0xf8fd4611,
0x257ec1ed, 0x475ce3d6, 0x60ee2db1, 0x2afab002, 0x2b9e4878, 0x86b340de,
0x1482fdca, 0xfe41b3bf, 0xd4a412b0, 0xe09db98c, 0xc1af5d53, 0x7e55e25f,
0xd3346b38, 0xb7a12cbd, 0x9c6827ba, 0x71f78bee, 0x8c3a0f52, 0x150491b0,
0xf26de912, 0x233e3a4e, 0xd309ebba, 0xa0a9e0ff, 0xca2b5921, 0xeeb9893c,
0x33829e88, 0x9870cc2a, 0x23c4b9d0, 0xeba32ea3, 0xbdac4d22, 0x3bc8c44c,
0x1e8d0397, 0xf9327735, 0x783b009f, 0xeb83742, 0x2621dc71, 0xed017d03,
0x5c760aa1, 0x5a69814b, 0x96e3047f, 0xa93c9cde, 0x615c86f5, 0xb4322aa5,
0x4225534d, 0xd2e2de3, 0xccfccc4b, 0xbac2a57, 0xf0a06d04, 0xbc78d737,
0xf2d1f766, 0xf5a7953c, 0xbcdfda85, 0x5213b7d5, 0xbce8a328, 0xd38f5f18,
0xdb094244, 0xfe571253, 0x317fa7ee, 0x4a324f43, 0x3ffc39d9, 0x51b3fa8e,
0x7a4bee9f, 0x78bbc682, 0x9f5c0350, 0x2fe286c, 0x245ab686, 0xed6bf7d7,
0xac4988a, 0x3fe010fa, 0xc65fe369, 0xa45749cb, 0x2b84e537, 0xde9ff363,
0x20540f9a, 0xaa8c9b34, 0x5bc476b3, 0x1d574bd7, 0x929100ad, 0x4721de4d,
0x27df1b05, 0x58b18546, 0xb7e76764, 0xdf904e58, 0x97af57a1, 0xbd4dc433,
0xa6256dfd, 0xf63998f3, 0xf1e05833, 0xe20acf26, 0xf57fd9d6, 0x90300b4d,
0x89df4290, 0x68d01cbc, 0xcf893ee3, 0xcc42a046, 0x778e181b, 0x67265c76,
0xe981a4c4, 0x82991da1, 0x708f7294, 0xe6e2ae62, 0xfc441870, 0x95e1b0b6,
0x445f825, 0x5a93b47f, 0x5e9cf4be, 0x84da71e7, 0x9d9582b0, 0x9bf835ef,
0x591f61e2, 0x43325985, 0x5d2de32e, 0x8d8fbf0f, 0x95b30f38, 0x7ad5b6e,
0x4e934edf, 0x3cd4990e, 0x9053e259, 0x5c41857d}
package chunk
import (
"bytes"
"fmt"
"io"
"testing"
util "github.com/ipfs/go-ipfs-util"
)
func TestBuzhashChunking(t *testing.T) {
data := make([]byte, 1024*1024*16)
util.NewTimeSeededRand().Read(data)
r := NewBuzhash(bytes.NewReader(data))
var chunks [][]byte
for {
chunk, err := r.NextBytes()
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
chunks = append(chunks, chunk)
}
t.Logf("average block size: %d\n", len(data)/len(chunks))
unchunked := bytes.Join(chunks, nil)
if !bytes.Equal(unchunked, data) {
fmt.Printf("%d %d\n", len(unchunked), len(data))
//ioutil.WriteFile("./incorrect", unchunked, 0777)
//ioutil.WriteFile("./correct", data, 0777)
t.Fatal("data was chunked incorrectly")
}
}
func TestBuzhashChunkReuse(t *testing.T) {
newBuzhash := func(r io.Reader) Splitter {
return NewBuzhash(r)
}
testReuse(t, newBuzhash)
}
func BenchmarkBuzhash2(b *testing.B) {
benchmarkChunker(b, func(r io.Reader) Splitter {
return NewBuzhash(r)
})
}
func TestBuzhashBitsHashBias(t *testing.T) {
counts := make([]byte, 32)
for _, h := range bytehash {
for i := 0; i < 32; i++ {
if h&1 == 1 {
counts[i]++
}
h = h >> 1
}
}
for i, c := range counts {
if c != 128 {
t.Errorf("Bit balance in position %d broken, %d ones", i, c)
}
}
}
// This file generates bytehash LUT
package main
import (
"fmt"
"math/rand"
)
const nRounds = 200
func main() {
rnd := rand.New(rand.NewSource(0))
lut := make([]uint32, 256)
for i := 0; i < 256/2; i++ {
lut[i] = 1<<32 - 1
}
for r := 0; r < nRounds; r++ {
for b := uint32(0); b < 32; b++ {
mask := uint32(1) << b
nmask := ^mask
for i, j := range rnd.Perm(256) {
li := lut[i]
lj := lut[j]
lut[i] = li&nmask | (lj & mask)
lut[j] = lj&nmask | (li & mask)
}
}
}
fmt.Printf("%#v", lut)
}
......@@ -14,8 +14,8 @@ var (
)
// FromString returns a Splitter depending on the given string:
// it supports "default" (""), "size-{size}", "rabin", "rabin-{blocksize}" and
// "rabin-{min}-{avg}-{max}".
// it supports "default" (""), "size-{size}", "rabin", "rabin-{blocksize}",
// "rabin-{min}-{avg}-{max}" and "buzhash".
func FromString(r io.Reader, chunker string) (Splitter, error) {
switch {
case chunker == "" || chunker == "default":
......@@ -34,6 +34,9 @@ func FromString(r io.Reader, chunker string) (Splitter, error) {
case strings.HasPrefix(chunker, "rabin"):
return parseRabinString(r, chunker)
case chunker == "buzhash":
return NewBuzhash(r), nil
default:
return nil, fmt.Errorf("unrecognized chunker option: %s", chunker)
}
......
......@@ -39,8 +39,8 @@ func TestRabinChunking(t *testing.T) {
}
}
func chunkData(t *testing.T, data []byte) map[string]blocks.Block {
r := NewRabin(bytes.NewReader(data), 1024*256)
func chunkData(t *testing.T, newC newSplitter, data []byte) map[string]blocks.Block {
r := newC(bytes.NewReader(data))
blkmap := make(map[string]blocks.Block)
......@@ -60,12 +60,12 @@ func chunkData(t *testing.T, data []byte) map[string]blocks.Block {
return blkmap
}
func TestRabinChunkReuse(t *testing.T) {
func testReuse(t *testing.T, cr newSplitter) {
data := make([]byte, 1024*1024*16)
util.NewTimeSeededRand().Read(data)
ch1 := chunkData(t, data[1000:])
ch2 := chunkData(t, data)
ch1 := chunkData(t, cr, data[1000:])
ch2 := chunkData(t, cr, data)
var extra int
for k := range ch2 {
......@@ -76,35 +76,21 @@ func TestRabinChunkReuse(t *testing.T) {
}
if extra > 2 {
t.Log("too many spare chunks made")
t.Logf("too many spare chunks made: %d", extra)
}
}
func TestRabinChunkReuse(t *testing.T) {
newRabin := func(r io.Reader) Splitter {
return NewRabin(r, 256*1024)
}
testReuse(t, newRabin)
}
var Res uint64
func BenchmarkRabin(b *testing.B) {
data := make([]byte, 16<<20)
util.NewTimeSeededRand().Read(data)
b.SetBytes(16 << 20)
b.ReportAllocs()
b.ResetTimer()
var res uint64
for i := 0; i < b.N; i++ {
r := NewRabin(bytes.NewReader(data), 1024*256)
for {
chunk, err := r.NextBytes()
if err != nil {
if err == io.EOF {
break
}
b.Fatal(err)
}
res = res + uint64(len(chunk))
}
}
Res = Res + res
benchmarkChunker(b, func(r io.Reader) Splitter {
return NewRabin(r, 256<<10)
})
}
......@@ -6,8 +6,6 @@ import (
"testing"
u "github.com/ipfs/go-ipfs-util"
util "github.com/ipfs/go-ipfs-util"
pool "github.com/libp2p/go-buffer-pool"
)
func randBuf(t *testing.T, size int) []byte {
......@@ -122,29 +120,7 @@ func (s *clipReader) Read(buf []byte) (int, error) {
}
func BenchmarkDefault(b *testing.B) {
data := make([]byte, 16<<20)
util.NewTimeSeededRand().Read(data)
b.SetBytes(16 << 20)
b.ReportAllocs()
b.ResetTimer()
var res uint64
for i := 0; i < b.N; i++ {
r := DefaultSplitter(bytes.NewReader(data))
for {
chunk, err := r.NextBytes()
if err != nil {
if err == io.EOF {
break
}
b.Fatal(err)
}
res = res + uint64(len(chunk))
pool.Put(chunk)
}
}
Res = Res + res
benchmarkChunker(b, func(r io.Reader) Splitter {
return DefaultSplitter(r)
})
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment