arc_cache.go 3.68 KB
Newer Older
1 2 3
package blockstore

import (
4
	"context"
5

6
	"gx/ipfs/QmVA4mafxbfH5aEvNz8fyoxC6J1xhAtw88B4GerPznSZBg/go-block-format"
7 8

	"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
9
	cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
10
	ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
11 12 13
	lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
)

14 15 16
// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) for
// block Cids. This provides block access-time improvements, allowing
// to short-cut many searches without query-ing the underlying datastore.
17 18 19
type arccache struct {
	arc        *lru.ARCCache
	blockstore Blockstore
20 21 22

	hits  metrics.Counter
	total metrics.Counter
23 24
}

25
func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) {
26 27 28 29
	arc, err := lru.NewARC(lruSize)
	if err != nil {
		return nil, err
	}
30 31 32
	c := &arccache{arc: arc, blockstore: bs}
	c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
	c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()
33

34
	return c, nil
35 36
}

37
func (b *arccache) DeleteBlock(k *cid.Cid) error {
38 39 40 41 42 43 44
	if has, ok := b.hasCached(k); ok && !has {
		return ErrNotFound
	}

	b.arc.Remove(k) // Invalidate cache before deleting.
	err := b.blockstore.DeleteBlock(k)
	switch err {
45
	case nil, ds.ErrNotFound, ErrNotFound:
46
		b.addCache(k, false)
47
		return err
48 49 50 51 52 53 54
	default:
		return err
	}
}

// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
55
func (b *arccache) hasCached(k *cid.Cid) (has bool, ok bool) {
56
	b.total.Inc()
57 58
	if k == nil {
		log.Error("nil cid in arccache")
59 60
		// Return cache invalid so the call to blockstore happens
		// in case of invalid key and correct error is created.
61 62
		return false, false
	}
63

64
	h, ok := b.arc.Get(k.KeyString())
65
	if ok {
66
		b.hits.Inc()
67
		return h.(bool), true
68
	}
69
	return false, false
70 71
}

72
func (b *arccache) Has(k *cid.Cid) (bool, error) {
73 74 75 76 77 78
	if has, ok := b.hasCached(k); ok {
		return has, nil
	}

	res, err := b.blockstore.Has(k)
	if err == nil {
79
		b.addCache(k, res)
80 81 82 83
	}
	return res, err
}

84 85 86 87 88 89
func (b *arccache) Get(k *cid.Cid) (blocks.Block, error) {
	if k == nil {
		log.Error("nil cid in arc cache")
		return nil, ErrNotFound
	}

90 91 92 93 94 95
	if has, ok := b.hasCached(k); ok && !has {
		return nil, ErrNotFound
	}

	bl, err := b.blockstore.Get(k)
	if bl == nil && err == ErrNotFound {
96
		b.addCache(k, false)
97
	} else if bl != nil {
98
		b.addCache(k, true)
99 100 101 102 103
	}
	return bl, err
}

func (b *arccache) Put(bl blocks.Block) error {
104
	if has, ok := b.hasCached(bl.Cid()); ok && has {
105 106 107 108 109
		return nil
	}

	err := b.blockstore.Put(bl)
	if err == nil {
110
		b.addCache(bl.Cid(), true)
111 112 113 114 115 116 117
	}
	return err
}

func (b *arccache) PutMany(bs []blocks.Block) error {
	var good []blocks.Block
	for _, block := range bs {
118 119
		// call put on block if result is inconclusive or we are sure that
		// the block isn't in storage
120
		if has, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
121 122 123
			good = append(good, block)
		}
	}
124
	err := b.blockstore.PutMany(good)
125 126
	if err != nil {
		return err
127
	}
128
	for _, block := range good {
129
		b.addCache(block.Cid(), true)
130 131
	}
	return nil
132 133
}

134 135 136 137
func (b *arccache) HashOnRead(enabled bool) {
	b.blockstore.HashOnRead(enabled)
}

138 139 140 141 142
func (b *arccache) addCache(c *cid.Cid, has bool) {
	b.arc.Add(c.KeyString(), has)
}

func (b *arccache) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
143 144 145 146 147 148 149 150 151 152 153 154 155 156
	return b.blockstore.AllKeysChan(ctx)
}

func (b *arccache) GCLock() Unlocker {
	return b.blockstore.(GCBlockstore).GCLock()
}

func (b *arccache) PinLock() Unlocker {
	return b.blockstore.(GCBlockstore).PinLock()
}

func (b *arccache) GCRequested() bool {
	return b.blockstore.(GCBlockstore).GCRequested()
}