arc_cache.go 5.43 KB
Newer Older
1 2 3
package blockstore

import (
4
	"context"
5

6 7 8 9
	lru "github.com/hashicorp/golang-lru"
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	metrics "github.com/ipfs/go-metrics-interface"
10 11
)

12 13 14
type cacheHave bool
type cacheSize int

15 16 17 18
// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) that
// does not store the actual blocks, just metadata about them: existence and
// size. This provides block access-time improvements, allowing
// to short-cut many searches without querying the underlying datastore.
19
type arccache struct {
20
	cache      *lru.TwoQueueCache
21
	blockstore Blockstore
22
	viewer     Viewer
23 24 25

	hits  metrics.Counter
	total metrics.Counter
26 27
}

28 29 30
var _ Blockstore = (*arccache)(nil)
var _ Viewer = (*arccache)(nil)

31
func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) {
32
	cache, err := lru.New2Q(lruSize)
33 34 35
	if err != nil {
		return nil, err
	}
36
	c := &arccache{cache: cache, blockstore: bs}
37 38
	c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
	c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()
39 40 41
	if v, ok := bs.(Viewer); ok {
		c.viewer = v
	}
42
	return c, nil
43 44
}

45
func (b *arccache) DeleteBlock(k cid.Cid) error {
46
	if has, _, ok := b.queryCache(k); ok && !has {
Steven Allen's avatar
Steven Allen committed
47
		return nil
48 49
	}

50
	b.cache.Remove(k) // Invalidate cache before deleting.
51
	err := b.blockstore.DeleteBlock(k)
Steven Allen's avatar
Steven Allen committed
52
	if err == nil {
53
		b.cacheHave(k, false)
54
	}
Steven Allen's avatar
Steven Allen committed
55
	return err
56 57
}

58
func (b *arccache) Has(k cid.Cid) (bool, error) {
59
	if has, _, ok := b.queryCache(k); ok {
60 61 62 63 64
		return has, nil
	}
	has, err := b.blockstore.Has(k)
	if err != nil {
		return false, err
65
	}
66 67
	b.cacheHave(k, has)
	return has, nil
68
}
69

70
func (b *arccache) GetSize(k cid.Cid) (int, error) {
71
	if has, blockSize, ok := b.queryCache(k); ok {
72 73 74 75 76 77
		if !has {
			// don't have it, return
			return -1, ErrNotFound
		}
		if blockSize >= 0 {
			// have it and we know the size
78 79
			return blockSize, nil
		}
80
		// we have it but don't know the size, ask the datastore.
81 82
	}
	blockSize, err := b.blockstore.GetSize(k)
83
	if err == ErrNotFound {
84
		b.cacheHave(k, false)
85
	} else if err == nil {
86
		b.cacheSize(k, blockSize)
87
	}
88
	return blockSize, err
89 90
}

91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
	// shortcircuit and fall back to Get if the underlying store
	// doesn't support Viewer.
	if b.viewer == nil {
		blk, err := b.Get(k)
		if err != nil {
			return err
		}
		return callback(blk.RawData())
	}

	if !k.Defined() {
		log.Error("undefined cid in arc cache")
		return ErrNotFound
	}

	if has, _, ok := b.queryCache(k); ok && !has {
		// short circuit if the cache deterministically tells us the item
		// doesn't exist.
		return ErrNotFound
	}

	return b.viewer.View(k, callback)
}

116 117 118
func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
	if !k.Defined() {
		log.Error("undefined cid in arc cache")
119 120 121
		return nil, ErrNotFound
	}

122
	if has, _, ok := b.queryCache(k); ok && !has {
123 124 125 126 127
		return nil, ErrNotFound
	}

	bl, err := b.blockstore.Get(k)
	if bl == nil && err == ErrNotFound {
128
		b.cacheHave(k, false)
129
	} else if bl != nil {
130
		b.cacheSize(k, len(bl.RawData()))
131 132 133 134 135
	}
	return bl, err
}

func (b *arccache) Put(bl blocks.Block) error {
136
	if has, _, ok := b.queryCache(bl.Cid()); ok && has {
137 138 139 140 141
		return nil
	}

	err := b.blockstore.Put(bl)
	if err == nil {
142
		b.cacheSize(bl.Cid(), len(bl.RawData()))
143 144 145 146 147 148 149
	}
	return err
}

func (b *arccache) PutMany(bs []blocks.Block) error {
	var good []blocks.Block
	for _, block := range bs {
150 151
		// call put on block if result is inconclusive or we are sure that
		// the block isn't in storage
152
		if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) {
153 154 155
			good = append(good, block)
		}
	}
156
	err := b.blockstore.PutMany(good)
157 158
	if err != nil {
		return err
159
	}
160
	for _, block := range good {
161
		b.cacheSize(block.Cid(), len(block.RawData()))
162 163
	}
	return nil
164 165
}

166 167 168 169
func (b *arccache) HashOnRead(enabled bool) {
	b.blockstore.HashOnRead(enabled)
}

170
func (b *arccache) cacheHave(c cid.Cid, have bool) {
171
	b.cache.Add(string(c.Hash()), cacheHave(have))
172 173
}

174
func (b *arccache) cacheSize(c cid.Cid, blockSize int) {
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
	b.cache.Add(string(c.Hash()), cacheSize(blockSize))
}

// queryCache checks if the CID is in the cache. If so, it returns:
//
//  * exists (bool): whether the CID is known to exist or not.
//  * size (int): the size if cached, or -1 if not cached.
//  * ok (bool): whether present in the cache.
//
// When ok is false, the answer in inconclusive and the caller must ignore the
// other two return values. Querying the underying store is necessary.
//
// When ok is true, exists carries the correct answer, and size carries the
// size, if known, or -1 if not.
func (b *arccache) queryCache(k cid.Cid) (exists bool, size int, ok bool) {
	b.total.Inc()
	if !k.Defined() {
		log.Error("undefined cid in arccache")
		// Return cache invalid so the call to blockstore happens
		// in case of invalid key and correct error is created.
		return false, -1, false
	}

	h, ok := b.cache.Get(string(k.Hash()))
	if ok {
		b.hits.Inc()
		switch h := h.(type) {
		case cacheHave:
			return bool(h), -1, true
		case cacheSize:
			return true, int(h), true
		}
	}
	return false, -1, false
209 210
}

211
func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
212 213 214 215 216 217 218 219 220 221 222 223 224 225
	return b.blockstore.AllKeysChan(ctx)
}

func (b *arccache) GCLock() Unlocker {
	return b.blockstore.(GCBlockstore).GCLock()
}

func (b *arccache) PinLock() Unlocker {
	return b.blockstore.(GCBlockstore).PinLock()
}

func (b *arccache) GCRequested() bool {
	return b.blockstore.(GCBlockstore).GCRequested()
}