datastore.go 4.14 KB
Newer Older
1 2 3
package leveldb

import (
Jeromy's avatar
Jeromy committed
4
	ds "github.com/ipfs/go-datastore"
Jakub Sztandera's avatar
Jakub Sztandera committed
5
	dsq "github.com/ipfs/go-datastore/query"
Jakub Sztandera's avatar
Jakub Sztandera committed
6 7 8 9
	"github.com/jbenet/goprocess"
	"github.com/syndtr/goleveldb/leveldb"
	"github.com/syndtr/goleveldb/leveldb/opt"
	"github.com/syndtr/goleveldb/leveldb/util"
10 11
)

12
type datastore struct {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
13
	DB *leveldb.DB
14 15 16 17
}

type Options opt.Options

Jeromy's avatar
Jeromy committed
18
func NewDatastore(path string, opts *Options) (*datastore, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20 21 22
	var nopts opt.Options
	if opts != nil {
		nopts = opt.Options(*opts)
	}
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
23 24 25 26
	db, err := leveldb.OpenFile(path, &nopts)
	if err != nil {
		return nil, err
	}
27

28
	return &datastore{
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
29 30
		DB: db,
	}, nil
31 32 33 34 35 36
}

// Returns ErrInvalidType if value is not of type []byte.
//
// Note: using sync = false.
// see http://godoc.org/github.com/syndtr/goleveldb/leveldb/opt#WriteOptions
37
func (d *datastore) Put(key ds.Key, value interface{}) (err error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38 39
	val, ok := value.([]byte)
	if !ok {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
40 41 42
		return ds.ErrInvalidType
	}
	return d.DB.Put(key.Bytes(), val, nil)
43 44
}

45
func (d *datastore) Get(key ds.Key) (value interface{}, err error) {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
46 47 48 49 50 51 52 53
	val, err := d.DB.Get(key.Bytes(), nil)
	if err != nil {
		if err == leveldb.ErrNotFound {
			return nil, ds.ErrNotFound
		}
		return nil, err
	}
	return val, nil
54 55
}

56
func (d *datastore) Has(key ds.Key) (exists bool, err error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
	return d.DB.Has(key.Bytes(), nil)
58 59
}

60
func (d *datastore) Delete(key ds.Key) (err error) {
61 62 63 64 65 66
	// leveldb Delete will not return an error if the key doesn't
	// exist (see https://github.com/syndtr/goleveldb/issues/109),
	// so check that the key exists first and if not return an
	// error
	exists, err := d.DB.Has(key.Bytes(), nil)
	if !exists {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
67
		return ds.ErrNotFound
68 69
	} else if err != nil {
		return err
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
70
	}
71
	return d.DB.Delete(key.Bytes(), nil)
72 73
}

74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {

	// we can use multiple iterators concurrently. see:
	// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
	// advance the iterator only if the reader reads
	//
	// run query in own sub-process tied to Results.Process(), so that
	// it waits for us to finish AND so that clients can signal to us
	// that resources should be reclaimed.
	qrb := dsq.NewResultBuilder(q)
	qrb.Process.Go(func(worker goprocess.Process) {
		d.runQuery(worker, qrb)
	})

	// go wait on the worker (without signaling close)
	go qrb.Process.CloseAfterChildren()

	// Now, apply remaining things (filters, order)
	qr := qrb.Results()
	for _, f := range q.Filters {
		qr = dsq.NaiveFilter(qr, f)
95
	}
96 97 98 99 100
	for _, o := range q.Orders {
		qr = dsq.NaiveOrder(qr, o)
	}
	return qr, nil
}
101

102
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
103

104 105 106 107 108 109
	var rnge *util.Range
	if qrb.Query.Prefix != "" {
		rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
	}
	i := d.DB.NewIterator(rnge, nil)
	defer i.Release()
110

111 112 113 114
	// advance iterator for offset
	if qrb.Query.Offset > 0 {
		for j := 0; j < qrb.Query.Offset; j++ {
			i.Next()
115
		}
116
	}
117

118 119 120 121 122 123
	// iterate, and handle limit, too
	for sent := 0; i.Next(); sent++ {
		// end early if we hit the limit
		if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
			break
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
124

125 126
		k := ds.NewKey(string(i.Key())).String()
		e := dsq.Entry{Key: k}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127

128 129 130 131
		if !qrb.Query.KeysOnly {
			buf := make([]byte, len(i.Value()))
			copy(buf, i.Value())
			e.Value = buf
132
		}
133

134 135 136 137 138 139
		select {
		case qrb.Output <- dsq.Result{Entry: e}: // we sent it out
		case <-worker.Closing(): // client told us to end early.
			break
		}
	}
140

141 142 143 144 145 146 147
	if err := i.Error(); err != nil {
		select {
		case qrb.Output <- dsq.Result{Error: err}: // client read our error
		case <-worker.Closing(): // client told us to end.
			return
		}
	}
Jeromy's avatar
Jeromy committed
148 149
}

150
// LevelDB needs to be closed.
151
func (d *datastore) Close() (err error) {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
152
	return d.DB.Close()
153
}
154

155
func (d *datastore) IsThreadSafe() {}
Jeromy's avatar
Jeromy committed
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186

type leveldbBatch struct {
	b  *leveldb.Batch
	db *leveldb.DB
}

func (d *datastore) Batch() (ds.Batch, error) {
	return &leveldbBatch{
		b:  new(leveldb.Batch),
		db: d.DB,
	}, nil
}

func (b *leveldbBatch) Put(key ds.Key, value interface{}) error {
	val, ok := value.([]byte)
	if !ok {
		return ds.ErrInvalidType
	}

	b.b.Put(key.Bytes(), val)
	return nil
}

func (b *leveldbBatch) Commit() error {
	return b.db.Write(b.b, nil)
}

func (b *leveldbBatch) Delete(key ds.Key) error {
	b.b.Delete(key.Bytes())
	return nil
}