blockstore.go 4.39 KB
Newer Older
1 2 3 4 5 6
// package blockstore implements a thin wrapper over a datastore, giving a
// clean interface for Getting and Putting block objects.
package blockstore

import (
	"errors"
Jeromy's avatar
Jeromy committed
7
	"sync"
8

9 10 11 12 13 14
	ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
	dsns "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
	dsq "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
	mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	blocks "github.com/ipfs/go-ipfs/blocks"
15
	key "github.com/ipfs/go-ipfs/blocks/key"
Jeromy's avatar
Jeromy committed
16
	logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
17 18
)

Jeromy's avatar
Jeromy committed
19
var log = logging.Logger("blockstore")
20

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
// BlockPrefix namespaces blockstore datastores
22
var BlockPrefix = ds.NewKey("blocks")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23

24 25
var ValueTypeMismatch = errors.New("The retrieved value is not a Block")

26 27
var ErrNotFound = errors.New("blockstore: block not found")

Brian Tiger Chow's avatar
Brian Tiger Chow committed
28
// Blockstore wraps a ThreadSafeDatastore
29
type Blockstore interface {
30 31 32
	DeleteBlock(key.Key) error
	Has(key.Key) (bool, error)
	Get(key.Key) (*blocks.Block, error)
33
	Put(*blocks.Block) error
34
	PutMany([]*blocks.Block) error
35

36
	AllKeysChan(ctx context.Context) (<-chan key.Key, error)
37 38
}

Jeromy's avatar
Jeromy committed
39 40 41 42 43 44 45 46
type GCBlockstore interface {
	Blockstore

	Lock() func()
	RLock() func()
}

func NewBlockstore(d ds.ThreadSafeDatastore) *blockstore {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
	dd := dsns.Wrap(d, BlockPrefix)
48
	return &blockstore{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49
		datastore: dd,
50 51 52 53
	}
}

type blockstore struct {
Jeromy's avatar
Jeromy committed
54
	datastore ds.Batching
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56
	// cant be ThreadSafeDatastore cause namespace.Datastore doesnt support it.
	// we do check it on `NewBlockstore` though.
Jeromy's avatar
Jeromy committed
57 58

	lk sync.RWMutex
59 60
}

61
func (bs *blockstore) Get(k key.Key) (*blocks.Block, error) {
62
	maybeData, err := bs.datastore.Get(k.DsKey())
63 64 65
	if err == ds.ErrNotFound {
		return nil, ErrNotFound
	}
66 67 68 69 70 71 72 73 74 75 76 77
	if err != nil {
		return nil, err
	}
	bdata, ok := maybeData.([]byte)
	if !ok {
		return nil, ValueTypeMismatch
	}

	return blocks.NewBlockWithHash(bdata, mh.Multihash(k))
}

func (bs *blockstore) Put(block *blocks.Block) error {
78
	k := block.Key().DsKey()
79 80

	// Has is cheaper than Put, so see if we already have it
81
	exists, err := bs.datastore.Has(k)
82
	if err == nil && exists {
83 84 85
		return nil // already stored.
	}
	return bs.datastore.Put(k, block.Data)
86
}
87

88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
func (bs *blockstore) PutMany(blocks []*blocks.Block) error {
	t, err := bs.datastore.Batch()
	if err != nil {
		return err
	}
	for _, b := range blocks {
		k := b.Key().DsKey()
		exists, err := bs.datastore.Has(k)
		if err == nil && exists {
			continue
		}

		err = t.Put(k, b.Data)
		if err != nil {
			return err
		}
	}
	return t.Commit()
}

108
func (bs *blockstore) Has(k key.Key) (bool, error) {
109 110 111
	return bs.datastore.Has(k.DsKey())
}

112
func (s *blockstore) DeleteBlock(k key.Key) error {
113 114
	return s.datastore.Delete(k.DsKey())
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
115

116
// AllKeysChan runs a query for keys from the blockstore.
117 118
// this is very simplistic, in the future, take dsq.Query as a param?
//
119
// AllKeysChan respects context
120
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121 122

	// KeysOnly, because that would be _a lot_ of data.
123
	q := dsq.Query{KeysOnly: true}
124 125
	// datastore/namespace does *NOT* fix up Query.Prefix
	q.Prefix = BlockPrefix.String()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126 127 128 129 130
	res, err := bs.datastore.Query(q)
	if err != nil {
		return nil, err
	}

131
	// this function is here to compartmentalize
132
	get := func() (k key.Key, ok bool) {
133 134 135 136 137 138 139 140 141 142 143 144
		select {
		case <-ctx.Done():
			return k, false
		case e, more := <-res.Next():
			if !more {
				return k, false
			}
			if e.Error != nil {
				log.Debug("blockstore.AllKeysChan got err:", e.Error)
				return k, false
			}

145 146
			// need to convert to key.Key using key.KeyFromDsKey.
			k = key.KeyFromDsKey(ds.NewKey(e.Key))
147
			log.Debug("blockstore: query got key", k)
148 149 150 151 152 153 154

			// key must be a multihash. else ignore it.
			_, err := mh.Cast([]byte(k))
			if err != nil {
				return "", true
			}

155 156
			return k, true
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
	}
158

159
	output := make(chan key.Key)
160 161 162 163 164 165 166 167 168 169 170
	go func() {
		defer func() {
			res.Process().Close() // ensure exit (signals early exit, too)
			close(output)
		}()

		for {
			k, ok := get()
			if !ok {
				return
			}
171 172 173
			if k == "" {
				continue
			}
174 175 176 177 178 179 180 181 182 183

			select {
			case <-ctx.Done():
				return
			case output <- k:
			}
		}
	}()

	return output, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
184
}
Jeromy's avatar
Jeromy committed
185 186 187 188 189 190 191 192 193 194

func (bs *blockstore) Lock() func() {
	bs.lk.Lock()
	return bs.lk.Unlock
}

func (bs *blockstore) RLock() func() {
	bs.lk.RLock()
	return bs.lk.RUnlock
}