Commit db6f0589 authored by Steven Allen's avatar Steven Allen

remove some dead code

License: MIT
Signed-off-by: default avatarSteven Allen <steven@stebalien.com>
parent cbd13b61
// Package bloom implements a simple bloom filter.
package bloom
import (
"encoding/binary"
"errors"
// Non crypto hash, because speed
"gx/ipfs/QmeWQMDa5dSdP4n8WDeoY5z8L2EKVqF4ZvK4VEHsLqXsGu/hamming"
"hash"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mtchavez/jenkins"
)
// A Filter represents a bloom filter.
type Filter interface {
Add([]byte)
Find([]byte) bool
Merge(Filter) (Filter, error)
HammingDistance(Filter) (int, error)
}
// NewFilter creates a new bloom Filter with the given
// size. k (the number of hash functions), is hardcoded to 3.
func NewFilter(size int) Filter {
return &filter{
hash: jenkins.New(),
filter: make([]byte, size),
k: 3,
}
}
type filter struct {
filter []byte
hash hash.Hash32
k int
}
// BasicFilter calls NewFilter with a bloom filter size of
// 2048 bytes.
func BasicFilter() Filter {
return NewFilter(2048)
}
func (f *filter) Add(bytes []byte) {
for _, bit := range f.getBitIndicies(bytes) {
f.setBit(bit)
}
}
func (f *filter) getBitIndicies(bytes []byte) []uint32 {
indicies := make([]uint32, f.k)
f.hash.Write(bytes)
b := make([]byte, 4)
for i := 0; i < f.k; i++ {
res := f.hash.Sum32()
indicies[i] = res % (uint32(len(f.filter)) * 8)
binary.LittleEndian.PutUint32(b, res)
f.hash.Write(b)
}
f.hash.Reset()
return indicies
}
func (f *filter) Find(bytes []byte) bool {
for _, bit := range f.getBitIndicies(bytes) {
if !f.getBit(bit) {
return false
}
}
return true
}
func (f *filter) setBit(i uint32) {
f.filter[i/8] |= (1 << byte(i%8))
}
func (f *filter) getBit(i uint32) bool {
return f.filter[i/8]&(1<<byte(i%8)) != 0
}
func (f *filter) Merge(o Filter) (Filter, error) {
casfil, ok := o.(*filter)
if !ok {
return nil, errors.New("Unsupported filter type")
}
if len(casfil.filter) != len(f.filter) {
return nil, errors.New("filter lengths must match")
}
if casfil.k != f.k {
return nil, errors.New("filter k-values must match")
}
nfilt := new(filter)
nfilt.hash = f.hash
nfilt.filter = make([]byte, len(f.filter))
nfilt.k = f.k
for i, v := range f.filter {
nfilt.filter[i] = v | casfil.filter[i]
}
return nfilt, nil
}
func (f *filter) HammingDistance(o Filter) (int, error) {
casfil, ok := o.(*filter)
if !ok {
return 0, errors.New("Unsupported filter type")
}
if len(f.filter) != len(casfil.filter) {
return 0, errors.New("filter lengths must match")
}
acc := 0
// xor together
for i := 0; i < len(f.filter); i++ {
acc += hamming.Byte(f.filter[i], casfil.filter[i])
}
return acc, nil
}
package bloom;
message PackedFilter {
enum HashType {
}
optional bool compressed;
optional bytes data;
repeated HashType hashes;
}
package bloom
import (
"encoding/binary"
"fmt"
"testing"
)
func TestBasicFilter(t *testing.T) {
f := BasicFilter().(*filter)
if len(f.filter) != 2048 {
t.Fatal("basic filter should have length 2048, has:", len(f.filter))
}
}
func TestFilter(t *testing.T) {
f := NewFilter(128)
keys := [][]byte{
[]byte("hello"),
[]byte("fish"),
[]byte("ipfsrocks"),
[]byte("i want ipfs socks"),
}
f.Add(keys[0])
if !f.Find(keys[0]) {
t.Fatal("Failed to find single inserted key!")
}
f.Add(keys[1])
if !f.Find(keys[1]) {
t.Fatal("Failed to find key!")
}
f.Add(keys[2])
f.Add(keys[3])
for _, k := range keys {
if !f.Find(k) {
t.Fatal("Couldnt find one of three keys")
}
}
if f.Find([]byte("beep boop")) {
t.Fatal("Got false positive! Super unlikely!")
}
fmt.Println(f)
}
func TestMerge(t *testing.T) {
f1 := NewFilter(128)
f2 := NewFilter(128)
fbork := NewFilter(32)
_, err := f1.Merge(fbork)
if err == nil {
t.Fatal("Merge should fail on filters with different lengths")
}
b := make([]byte, 4)
var i uint32
for i = 0; i < 10; i++ {
binary.LittleEndian.PutUint32(b, i)
f1.Add(b)
}
for i = 10; i < 20; i++ {
binary.LittleEndian.PutUint32(b, i)
f2.Add(b)
}
merged, _ := f1.Merge(f2)
for i = 0; i < 20; i++ {
binary.LittleEndian.PutUint32(b, i)
if !merged.Find(b) {
t.Fatal("Could not find all keys in merged filter")
}
}
}
func TestHamming(t *testing.T) {
f1 := NewFilter(128)
f2 := NewFilter(128)
f1.Add([]byte("no collision"))
f1.Add([]byte("collision? no!"))
dist, _ := f1.HammingDistance(f2)
if dist != 6 {
t.Fatal("Should have 6 bit difference")
}
}
// Package set defines the BlockSet interface which provides
// abstraction for sets of Cids.
// It provides a default implementation using cid.Set.
package set
import (
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
"github.com/ipfs/go-ipfs/blocks/bloom"
)
// BlockSet represents a mutable set of blocks CIDs.
type BlockSet interface {
AddBlock(*cid.Cid)
RemoveBlock(*cid.Cid)
HasKey(*cid.Cid) bool
// GetBloomFilter creates and returns a bloom filter to which
// all the CIDs in the set have been added.
GetBloomFilter() bloom.Filter
GetKeys() []*cid.Cid
}
// SimpleSetFromKeys returns a default implementation of BlockSet
// using cid.Set. The given keys are added to the set.
func SimpleSetFromKeys(keys []*cid.Cid) BlockSet {
sbs := &simpleBlockSet{blocks: cid.NewSet()}
for _, k := range keys {
sbs.AddBlock(k)
}
return sbs
}
// NewSimpleBlockSet returns a new empty default implementation
// of BlockSet using cid.Set.
func NewSimpleBlockSet() BlockSet {
return &simpleBlockSet{blocks: cid.NewSet()}
}
type simpleBlockSet struct {
blocks *cid.Set
}
func (b *simpleBlockSet) AddBlock(k *cid.Cid) {
b.blocks.Add(k)
}
func (b *simpleBlockSet) RemoveBlock(k *cid.Cid) {
b.blocks.Remove(k)
}
func (b *simpleBlockSet) HasKey(k *cid.Cid) bool {
return b.blocks.Has(k)
}
func (b *simpleBlockSet) GetBloomFilter() bloom.Filter {
f := bloom.BasicFilter()
for _, k := range b.blocks.Keys() {
f.Add(k.Bytes())
}
return f
}
func (b *simpleBlockSet) GetKeys() []*cid.Cid {
return b.blocks.Keys()
}
package set
import (
"testing"
bu "github.com/ipfs/go-ipfs/blocks/blocksutil"
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
)
const (
tAdd int = 1 << iota
tRemove
tReAdd
)
func exampleKeys() []*cid.Cid {
res := make([]*cid.Cid, 1<<8)
gen := bu.NewBlockGenerator()
for i := uint64(0); i < 1<<8; i++ {
res[i] = gen.Next().Cid()
}
return res
}
func checkSet(set BlockSet, keySlice []*cid.Cid, t *testing.T) {
for i, key := range keySlice {
if i&tReAdd == 0 {
if !set.HasKey(key) {
t.Error("key should be in the set")
}
} else if i&tRemove == 0 {
if set.HasKey(key) {
t.Error("key shouldn't be in the set")
}
} else if i&tAdd == 0 {
if !set.HasKey(key) {
t.Error("key should be in the set")
}
}
}
}
func TestSetWorks(t *testing.T) {
set := NewSimpleBlockSet()
keys := exampleKeys()
for i, key := range keys {
if i&tAdd == 0 {
set.AddBlock(key)
}
}
for i, key := range keys {
if i&tRemove == 0 {
set.RemoveBlock(key)
}
}
for i, key := range keys {
if i&tReAdd == 0 {
set.AddBlock(key)
}
}
checkSet(set, keys, t)
addedKeys := set.GetKeys()
newSet := SimpleSetFromKeys(addedKeys)
// same check works on a new set
checkSet(newSet, keys, t)
bloom := set.GetBloomFilter()
for _, key := range addedKeys {
if !bloom.Find(key.Bytes()) {
t.Error("bloom doesn't contain expected key")
}
}
}
package mockrouting
import (
context "context"
dht "gx/ipfs/QmNV315eTphFgCttWPrT5ARNsiPNRLGFWHRJZyXyqvmjD6/go-libp2p-kad-dht"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
sync "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/sync"
"gx/ipfs/QmWRCn8vruNAzHx8i6SAXinuheRitKEGu8c7m26stKvsYx/go-testutil"
mocknet "gx/ipfs/QmbRT4BwPQEx4CPCd8LKYL46tFWYneGswQnHFdsuiczJRL/go-libp2p/p2p/net/mock"
)
type mocknetserver struct {
mn mocknet.Mocknet
}
func NewDHTNetwork(mn mocknet.Mocknet) Server {
return &mocknetserver{
mn: mn,
}
}
func (rs *mocknetserver) Client(p testutil.Identity) Client {
return rs.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore())
}
func (rs *mocknetserver) ClientWithDatastore(ctx context.Context, p testutil.Identity, ds ds.Datastore) Client {
// FIXME AddPeer doesn't appear to be idempotent
host, err := rs.mn.AddPeer(p.PrivateKey(), p.Address())
if err != nil {
panic("FIXME")
}
return dht.NewDHT(ctx, host, sync.MutexWrap(ds))
}
var _ Server = &mocknetserver{}
package iter
func N(n int) []struct{} {
return make([]struct{}, n)
}
package multierr
import (
"fmt"
)
// Error contains a set of errors. Used to return multiple errors, as in listen.
type Error struct {
Errors []error
}
func (e *Error) Error() string {
if e == nil {
return "<nil error>"
}
var out string
for i, v := range e.Errors {
if v != nil {
out += fmt.Sprintf("%d: %s\n", i, v)
}
}
return out
}
func New(errs ...error) *Error {
return &Error{errs}
}
package todocounter
import (
"sync"
)
// Counter records things remaining to process. It is needed for complicated
// cases where multiple goroutines are spawned to process items, and they may
// generate more items to process. For example, say a query over a set of nodes
// may yield either a result value, or more nodes to query. Signaling is subtly
// complicated, because the queue may be empty while items are being processed,
// that will end up adding more items to the queue.
//
// Use Counter like this:
//
// todos := make(chan int, 10)
// ctr := todoctr.NewCounter()
//
// process := func(item int) {
// fmt.Println("processing %d\n...", item)
//
// // this task may randomly generate more tasks
// if rand.Intn(5) == 0 {
// todos<- item + 1
// ctr.Increment(1) // increment counter for new task.
// }
//
// ctr.Decrement(1) // decrement one to signal the task being done.
// }
//
// // add some tasks.
// todos<- 1
// todos<- 2
// todos<- 3
// todos<- 4
// ctr.Increment(4)
//
// for {
// select {
// case item := <- todos:
// go process(item)
// case <-ctr.Done():
// fmt.Println("done processing everything.")
// close(todos)
// }
// }
type Counter interface {
// Incrememnt adds a number of todos to track.
// If the counter is **below** zero, it panics.
Increment(i uint32)
// Decrement removes a number of todos to track.
// If the count drops to zero, signals done and destroys the counter.
// If the count drops **below** zero, panics. It means you have tried to remove
// more things than you added, i.e. sync issues.
Decrement(i uint32)
// Done returns a channel to wait upon. Use it in selects:
//
// select {
// case <-ctr.Done():
// // done processing all items
// }
//
Done() <-chan struct{}
}
type todoCounter struct {
count int32
done chan struct{}
sync.RWMutex
}
// NewSyncCounter constructs a new counter
func NewSyncCounter() Counter {
return &todoCounter{
done: make(chan struct{}),
}
}
func (c *todoCounter) Increment(i uint32) {
c.Lock()
defer c.Unlock()
if c.count < 0 {
panic("counter already signaled done. use a new counter.")
}
// increment count
c.count += int32(i)
}
// Decrement removes a number of todos to track.
// If the count drops to zero, signals done and destroys the counter.
// If the count drops **below** zero, panics. It means you have tried to remove
// more things than you added, i.e. sync issues.
func (c *todoCounter) Decrement(i uint32) {
c.Lock()
defer c.Unlock()
if c.count < 0 {
panic("counter already signaled done. probably have sync issues.")
}
if int32(i) > c.count {
panic("decrement amount creater than counter. sync issues.")
}
c.count -= int32(i)
if c.count == 0 { // done! signal it.
c.count-- // set it to -1 to prevent reuse
close(c.done) // a closed channel will always return nil
}
}
func (c *todoCounter) Done() <-chan struct{} {
return c.done
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment