blockstore.go 4.62 KB
Newer Older
1 2 3 4 5 6 7
// package blockstore implements a thin wrapper over a datastore, giving a
// clean interface for Getting and Putting block objects.
package blockstore

import (
	"errors"

8
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
9
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10 11
	dsns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
	dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
12
	mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
13

14
	blocks "github.com/jbenet/go-ipfs/blocks"
15
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
16 17 18
	u "github.com/jbenet/go-ipfs/util"
)

19 20
var log = eventlog.Logger("blockstore")

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
// BlockPrefix namespaces blockstore datastores
22
var BlockPrefix = ds.NewKey("b")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23

24 25
var ValueTypeMismatch = errors.New("The retrieved value is not a Block")

26 27
var ErrNotFound = errors.New("blockstore: block not found")

Brian Tiger Chow's avatar
Brian Tiger Chow committed
28
// Blockstore wraps a ThreadSafeDatastore
29
type Blockstore interface {
30 31
	DeleteBlock(u.Key) error
	Has(u.Key) (bool, error)
32 33
	Get(u.Key) (*blocks.Block, error)
	Put(*blocks.Block) error
34

35 36 37 38 39
	AllKeys(ctx context.Context) ([]u.Key, error)
	AllKeysChan(ctx context.Context) (<-chan u.Key, error)

	AllKeysRange(ctx context.Context, offset int, limit int) ([]u.Key, error)
	AllKeysRangeChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error)
40 41 42
}

func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
	dd := dsns.Wrap(d, BlockPrefix)
44
	return &blockstore{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
		datastore: dd,
46 47 48 49
	}
}

type blockstore struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50 51 52
	datastore ds.Datastore
	// cant be ThreadSafeDatastore cause namespace.Datastore doesnt support it.
	// we do check it on `NewBlockstore` though.
53 54 55 56
}

func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) {
	maybeData, err := bs.datastore.Get(k.DsKey())
57 58 59
	if err == ds.ErrNotFound {
		return nil, ErrNotFound
	}
60 61 62 63 64 65 66 67 68 69 70 71
	if err != nil {
		return nil, err
	}
	bdata, ok := maybeData.([]byte)
	if !ok {
		return nil, ValueTypeMismatch
	}

	return blocks.NewBlockWithHash(bdata, mh.Multihash(k))
}

func (bs *blockstore) Put(block *blocks.Block) error {
72 73 74 75 76 77 78
	// Has is cheaper than
	k := block.Key().DsKey()
	exists, err := bs.datastore.Has(k)
	if err != nil && exists {
		return nil // already stored.
	}
	return bs.datastore.Put(k, block.Data)
79
}
80 81 82 83 84 85 86 87

func (bs *blockstore) Has(k u.Key) (bool, error) {
	return bs.datastore.Has(k.DsKey())
}

func (s *blockstore) DeleteBlock(k u.Key) error {
	return s.datastore.Delete(k.DsKey())
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88

89 90 91 92 93 94 95 96 97
func (bs *blockstore) AllKeys(ctx context.Context) ([]u.Key, error) {
	return bs.AllKeysRange(ctx, 0, 0)
}

func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan u.Key, error) {
	return bs.AllKeysRangeChan(ctx, 0, 0)
}

// AllKeysRange runs a query for keys from the blockstore.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98 99
// this is very simplistic, in the future, take dsq.Query as a param?
// if offset and limit are 0, they are ignored.
100
//
101 102
// AllKeysRange respects context
func (bs *blockstore) AllKeysRange(ctx context.Context, offset int, limit int) ([]u.Key, error) {
103

104
	ch, err := bs.AllKeysRangeChan(ctx, offset, limit)
105 106 107 108
	if err != nil {
		return nil, err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
109
	var keys []u.Key
110 111 112 113 114 115
	for k := range ch {
		keys = append(keys, k)
	}
	return keys, nil
}

116
// AllKeysRangeChan runs a query for keys from the blockstore.
117 118 119
// this is very simplistic, in the future, take dsq.Query as a param?
// if offset and limit are 0, they are ignored.
//
120 121
// AllKeysRangeChan respects context
func (bs *blockstore) AllKeysRangeChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122 123 124 125 126 127 128 129

	// KeysOnly, because that would be _a lot_ of data.
	q := dsq.Query{KeysOnly: true, Offset: offset, Limit: limit}
	res, err := bs.datastore.Query(q)
	if err != nil {
		return nil, err
	}

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
	// this function is here to compartmentalize
	get := func() (k u.Key, ok bool) {
		select {
		case <-ctx.Done():
			return k, false
		case e, more := <-res.Next():
			if !more {
				return k, false
			}
			if e.Error != nil {
				log.Debug("blockstore.AllKeysChan got err:", e.Error)
				return k, false
			}

			// need to convert to u.Key using u.KeyFromDsKey.
			k = u.KeyFromDsKey(ds.NewKey(e.Key))
146
			log.Debug("blockstore: query got key", k)
147 148 149 150 151 152 153

			// key must be a multihash. else ignore it.
			_, err := mh.Cast([]byte(k))
			if err != nil {
				return "", true
			}

154 155
			return k, true
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156
	}
157 158 159 160 161 162 163 164 165 166 167 168 169

	output := make(chan u.Key)
	go func() {
		defer func() {
			res.Process().Close() // ensure exit (signals early exit, too)
			close(output)
		}()

		for {
			k, ok := get()
			if !ok {
				return
			}
170 171 172
			if k == "" {
				continue
			}
173 174 175 176 177 178 179 180 181 182

			select {
			case <-ctx.Done():
				return
			case output <- k:
			}
		}
	}()

	return output, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183
}