bloom_cache.go 4.63 KB
Newer Older
1 2 3
package blockstore

import (
4
	"context"
Steven Allen's avatar
Steven Allen committed
5
	"fmt"
6
	"sync/atomic"
7
	"time"
8

Steven Allen's avatar
Steven Allen committed
9
	bloom "github.com/ipfs/bbloom"
10 11 12
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	metrics "github.com/ipfs/go-metrics-interface"
13 14
)

15 16 17 18
// bloomCached returns a Blockstore that caches Has requests using a Bloom
// filter. bloomSize is size of bloom filter in bytes. hashCount specifies the
// number of hashing functions in the bloom filter (usually known as k).
func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (*bloomcache, error) {
19
	bl, err := bloom.New(float64(bloomSize), float64(hashCount))
20 21 22
	if err != nil {
		return nil, err
	}
Steven Allen's avatar
Steven Allen committed
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
	bc := &bloomcache{
		blockstore: bs,
		bloom:      bl,
		hits: metrics.NewCtx(ctx, "bloom.hits_total",
			"Number of cache hits in bloom cache").Counter(),
		total: metrics.NewCtx(ctx, "bloom_total",
			"Total number of requests to bloom cache").Counter(),
		buildChan: make(chan struct{}),
	}
	go func() {
		err := bc.build(ctx)
		if err != nil {
			select {
			case <-ctx.Done():
				log.Warning("Cache rebuild closed by context finishing: ", err)
			default:
				log.Error(err)
			}
			return
		}
		if metrics.Active() {
44 45 46 47
			fill := metrics.NewCtx(ctx, "bloom_fill_ratio",
				"Ratio of bloom filter fullnes, (updated once a minute)").Gauge()

			t := time.NewTicker(1 * time.Minute)
Steven Allen's avatar
Steven Allen committed
48
			defer t.Stop()
49 50 51 52 53 54 55
			for {
				select {
				case <-ctx.Done():
					return
				case <-t.C:
					fill.Set(bc.bloom.FillRatio())
				}
56
			}
Steven Allen's avatar
Steven Allen committed
57 58
		}
	}()
59 60 61 62
	return bc, nil
}

type bloomcache struct {
63
	active int32
64

Steven Allen's avatar
Steven Allen committed
65 66 67 68 69
	bloom    *bloom.Bloom
	buildErr error

	buildChan  chan struct{}
	blockstore Blockstore
70 71

	// Statistics
72 73
	hits  metrics.Counter
	total metrics.Counter
74 75 76
}

func (b *bloomcache) BloomActive() bool {
77
	return atomic.LoadInt32(&b.active) != 0
78 79
}

Steven Allen's avatar
Steven Allen committed
80 81 82 83 84 85 86 87 88 89 90
func (b *bloomcache) Wait(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-b.buildChan:
		return b.buildErr
	}
}

func (b *bloomcache) build(ctx context.Context) error {
	evt := log.EventBegin(ctx, "bloomcache.build")
91
	defer evt.Done()
Steven Allen's avatar
Steven Allen committed
92
	defer close(b.buildChan)
93 94 95

	ch, err := b.blockstore.AllKeysChan(ctx)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
96 97
		b.buildErr = fmt.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
		return b.buildErr
98
	}
Steven Allen's avatar
Steven Allen committed
99
	for {
100 101
		select {
		case key, ok := <-ch:
Steven Allen's avatar
Steven Allen committed
102 103 104
			if !ok {
				atomic.StoreInt32(&b.active, 1)
				return nil
105
			}
Steven Allen's avatar
Steven Allen committed
106
			b.bloom.AddTS(key.Bytes()) // Use binary key, the more compact the better
107
		case <-ctx.Done():
Steven Allen's avatar
Steven Allen committed
108 109
			b.buildErr = ctx.Err()
			return b.buildErr
110
		}
111 112 113
	}
}

114
func (b *bloomcache) DeleteBlock(k cid.Cid) error {
115 116 117 118
	if has, ok := b.hasCached(k); ok && !has {
		return ErrNotFound
	}

119
	return b.blockstore.DeleteBlock(k)
120 121 122 123
}

// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
124
func (b *bloomcache) hasCached(k cid.Cid) (has bool, ok bool) {
125
	b.total.Inc()
126 127
	if !k.Defined() {
		log.Error("undefined in bloom cache")
128 129 130
		// Return cache invalid so call to blockstore
		// in case of invalid key is forwarded deeper
		return false, false
131
	}
132
	if b.BloomActive() {
133
		blr := b.bloom.HasTS(k.Bytes())
134
		if !blr { // not contained in bloom is only conclusive answer bloom gives
135
			b.hits.Inc()
136
			return false, true
137 138
		}
	}
139
	return false, false
140 141
}

142
func (b *bloomcache) Has(k cid.Cid) (bool, error) {
143 144 145 146
	if has, ok := b.hasCached(k); ok {
		return has, nil
	}

147
	return b.blockstore.Has(k)
148 149
}

150
func (b *bloomcache) GetSize(k cid.Cid) (int, error) {
Steven Allen's avatar
Steven Allen committed
151
	return b.blockstore.GetSize(k)
152 153
}

154
func (b *bloomcache) Get(k cid.Cid) (blocks.Block, error) {
155 156 157 158
	if has, ok := b.hasCached(k); ok && !has {
		return nil, ErrNotFound
	}

159
	return b.blockstore.Get(k)
160 161 162
}

func (b *bloomcache) Put(bl blocks.Block) error {
163
	// See comment in PutMany
164 165
	err := b.blockstore.Put(bl)
	if err == nil {
166
		b.bloom.AddTS(bl.Cid().Bytes())
167 168 169 170 171
	}
	return err
}

func (b *bloomcache) PutMany(bs []blocks.Block) error {
172
	// bloom cache gives only conclusive resulty if key is not contained
173
	// to reduce number of puts we need conclusive information if block is contained
174 175
	// this means that PutMany can't be improved with bloom cache so we just
	// just do a passthrough.
176
	err := b.blockstore.PutMany(bs)
177 178
	if err != nil {
		return err
179
	}
180
	for _, bl := range bs {
181
		b.bloom.AddTS(bl.Cid().Bytes())
182 183
	}
	return nil
184 185
}

186 187 188 189
func (b *bloomcache) HashOnRead(enabled bool) {
	b.blockstore.HashOnRead(enabled)
}

190
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
191 192 193 194 195 196 197 198 199 200 201 202 203 204
	return b.blockstore.AllKeysChan(ctx)
}

func (b *bloomcache) GCLock() Unlocker {
	return b.blockstore.(GCBlockstore).GCLock()
}

func (b *bloomcache) PinLock() Unlocker {
	return b.blockstore.(GCBlockstore).PinLock()
}

func (b *bloomcache) GCRequested() bool {
	return b.blockstore.(GCBlockstore).GCRequested()
}