bloom_cache.go 5.07 KB
Newer Older
1 2 3
package blockstore

import (
4
	"context"
Steven Allen's avatar
Steven Allen committed
5
	"fmt"
6
	"sync/atomic"
7
	"time"
8

9 10 11 12
	bloom "gitlab.dms3.io/dms3/bbloom"
	blocks "gitlab.dms3.io/dms3/go-block-format"
	cid "gitlab.dms3.io/dms3/go-cid"
	metrics "gitlab.dms3.io/dms3/go-metrics-interface"
13 14
)

15 16 17 18
// bloomCached returns a Blockstore that caches Has requests using a Bloom
// filter. bloomSize is size of bloom filter in bytes. hashCount specifies the
// number of hashing functions in the bloom filter (usually known as k).
func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (*bloomcache, error) {
19
	bl, err := bloom.New(float64(bloomSize), float64(hashCount))
20 21 22
	if err != nil {
		return nil, err
	}
Steven Allen's avatar
Steven Allen committed
23 24 25 26 27 28 29 30 31
	bc := &bloomcache{
		blockstore: bs,
		bloom:      bl,
		hits: metrics.NewCtx(ctx, "bloom.hits_total",
			"Number of cache hits in bloom cache").Counter(),
		total: metrics.NewCtx(ctx, "bloom_total",
			"Total number of requests to bloom cache").Counter(),
		buildChan: make(chan struct{}),
	}
32 33 34
	if v, ok := bs.(Viewer); ok {
		bc.viewer = v
	}
Steven Allen's avatar
Steven Allen committed
35 36 37 38 39 40 41 42 43 44 45 46
	go func() {
		err := bc.build(ctx)
		if err != nil {
			select {
			case <-ctx.Done():
				log.Warning("Cache rebuild closed by context finishing: ", err)
			default:
				log.Error(err)
			}
			return
		}
		if metrics.Active() {
47 48 49 50
			fill := metrics.NewCtx(ctx, "bloom_fill_ratio",
				"Ratio of bloom filter fullnes, (updated once a minute)").Gauge()

			t := time.NewTicker(1 * time.Minute)
Steven Allen's avatar
Steven Allen committed
51
			defer t.Stop()
52 53 54 55 56
			for {
				select {
				case <-ctx.Done():
					return
				case <-t.C:
Steven Allen's avatar
Steven Allen committed
57
					fill.Set(bc.bloom.FillRatioTS())
58
				}
59
			}
Steven Allen's avatar
Steven Allen committed
60 61
		}
	}()
62 63 64 65
	return bc, nil
}

type bloomcache struct {
66
	active int32
67

Steven Allen's avatar
Steven Allen committed
68 69 70 71 72
	bloom    *bloom.Bloom
	buildErr error

	buildChan  chan struct{}
	blockstore Blockstore
73
	viewer     Viewer
74 75

	// Statistics
76 77
	hits  metrics.Counter
	total metrics.Counter
78 79
}

80 81 82
var _ Blockstore = (*bloomcache)(nil)
var _ Viewer = (*bloomcache)(nil)

83
func (b *bloomcache) BloomActive() bool {
84
	return atomic.LoadInt32(&b.active) != 0
85 86
}

Steven Allen's avatar
Steven Allen committed
87 88 89 90 91 92 93 94 95 96
func (b *bloomcache) Wait(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-b.buildChan:
		return b.buildErr
	}
}

func (b *bloomcache) build(ctx context.Context) error {
tavit ohanian's avatar
tavit ohanian committed
97 98
	// evt := log.EventBegin(ctx, "bloomcache.build")
	// defer evt.Done()
Steven Allen's avatar
Steven Allen committed
99
	defer close(b.buildChan)
100 101 102

	ch, err := b.blockstore.AllKeysChan(ctx)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
103 104
		b.buildErr = fmt.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
		return b.buildErr
105
	}
Steven Allen's avatar
Steven Allen committed
106
	for {
107 108
		select {
		case key, ok := <-ch:
Steven Allen's avatar
Steven Allen committed
109 110 111
			if !ok {
				atomic.StoreInt32(&b.active, 1)
				return nil
112
			}
113
			b.bloom.AddTS(key.Hash()) // Use binary key, the more compact the better
114
		case <-ctx.Done():
Steven Allen's avatar
Steven Allen committed
115 116
			b.buildErr = ctx.Err()
			return b.buildErr
117
		}
118 119 120
	}
}

121
func (b *bloomcache) DeleteBlock(k cid.Cid) error {
122
	if has, ok := b.hasCached(k); ok && !has {
Steven Allen's avatar
Steven Allen committed
123
		return nil
124 125
	}

126
	return b.blockstore.DeleteBlock(k)
127 128 129 130
}

// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
131
func (b *bloomcache) hasCached(k cid.Cid) (has bool, ok bool) {
132
	b.total.Inc()
133 134
	if !k.Defined() {
		log.Error("undefined in bloom cache")
135 136 137
		// Return cache invalid so call to blockstore
		// in case of invalid key is forwarded deeper
		return false, false
138
	}
139
	if b.BloomActive() {
140
		blr := b.bloom.HasTS(k.Hash())
141
		if !blr { // not contained in bloom is only conclusive answer bloom gives
142
			b.hits.Inc()
143
			return false, true
144 145
		}
	}
146
	return false, false
147 148
}

149
func (b *bloomcache) Has(k cid.Cid) (bool, error) {
150 151 152 153
	if has, ok := b.hasCached(k); ok {
		return has, nil
	}

154
	return b.blockstore.Has(k)
155 156
}

157
func (b *bloomcache) GetSize(k cid.Cid) (int, error) {
Steven Allen's avatar
Steven Allen committed
158
	return b.blockstore.GetSize(k)
159 160
}

161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
func (b *bloomcache) View(k cid.Cid, callback func([]byte) error) error {
	if b.viewer == nil {
		blk, err := b.Get(k)
		if err != nil {
			return err
		}
		return callback(blk.RawData())
	}

	if has, ok := b.hasCached(k); ok && !has {
		return ErrNotFound
	}
	return b.viewer.View(k, callback)
}

176
func (b *bloomcache) Get(k cid.Cid) (blocks.Block, error) {
177 178 179 180
	if has, ok := b.hasCached(k); ok && !has {
		return nil, ErrNotFound
	}

181
	return b.blockstore.Get(k)
182 183 184
}

func (b *bloomcache) Put(bl blocks.Block) error {
185
	// See comment in PutMany
186 187
	err := b.blockstore.Put(bl)
	if err == nil {
188
		b.bloom.AddTS(bl.Cid().Hash())
189 190 191 192 193
	}
	return err
}

func (b *bloomcache) PutMany(bs []blocks.Block) error {
194
	// bloom cache gives only conclusive resulty if key is not contained
195
	// to reduce number of puts we need conclusive information if block is contained
196 197
	// this means that PutMany can't be improved with bloom cache so we just
	// just do a passthrough.
198
	err := b.blockstore.PutMany(bs)
199 200
	if err != nil {
		return err
201
	}
202
	for _, bl := range bs {
203
		b.bloom.AddTS(bl.Cid().Hash())
204 205
	}
	return nil
206 207
}

208 209 210 211
func (b *bloomcache) HashOnRead(enabled bool) {
	b.blockstore.HashOnRead(enabled)
}

212
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
213 214 215 216 217 218 219 220 221 222 223 224 225 226
	return b.blockstore.AllKeysChan(ctx)
}

func (b *bloomcache) GCLock() Unlocker {
	return b.blockstore.(GCBlockstore).GCLock()
}

func (b *bloomcache) PinLock() Unlocker {
	return b.blockstore.(GCBlockstore).PinLock()
}

func (b *bloomcache) GCRequested() bool {
	return b.blockstore.(GCBlockstore).GCRequested()
}