datastore.go 3.51 KB
Newer Older
1 2 3
package leveldb

import (
Jeromy's avatar
Jeromy committed
4
	ds "github.com/ipfs/go-datastore"
Jakub Sztandera's avatar
Jakub Sztandera committed
5
	dsq "github.com/ipfs/go-datastore/query"
Jakub Sztandera's avatar
Jakub Sztandera committed
6 7 8 9
	"github.com/jbenet/goprocess"
	"github.com/syndtr/goleveldb/leveldb"
	"github.com/syndtr/goleveldb/leveldb/opt"
	"github.com/syndtr/goleveldb/leveldb/util"
10 11
)

12
type datastore struct {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
13
	DB *leveldb.DB
14 15 16 17
}

type Options opt.Options

Jeromy's avatar
Jeromy committed
18
func NewDatastore(path string, opts *Options) (*datastore, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20 21 22
	var nopts opt.Options
	if opts != nil {
		nopts = opt.Options(*opts)
	}
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
23 24 25 26
	db, err := leveldb.OpenFile(path, &nopts)
	if err != nil {
		return nil, err
	}
27

28
	return &datastore{
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
29 30
		DB: db,
	}, nil
31 32 33 34 35 36
}

// Returns ErrInvalidType if value is not of type []byte.
//
// Note: using sync = false.
// see http://godoc.org/github.com/syndtr/goleveldb/leveldb/opt#WriteOptions
37
func (d *datastore) Put(key ds.Key, value interface{}) (err error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38 39
	val, ok := value.([]byte)
	if !ok {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
40 41 42
		return ds.ErrInvalidType
	}
	return d.DB.Put(key.Bytes(), val, nil)
43 44
}

45
func (d *datastore) Get(key ds.Key) (value interface{}, err error) {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
46 47 48 49 50 51 52 53
	val, err := d.DB.Get(key.Bytes(), nil)
	if err != nil {
		if err == leveldb.ErrNotFound {
			return nil, ds.ErrNotFound
		}
		return nil, err
	}
	return val, nil
54 55
}

56
func (d *datastore) Has(key ds.Key) (exists bool, err error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
	return d.DB.Has(key.Bytes(), nil)
58 59
}

60
func (d *datastore) Delete(key ds.Key) (err error) {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
61 62 63 64 65
	err = d.DB.Delete(key.Bytes(), nil)
	if err == leveldb.ErrNotFound {
		return ds.ErrNotFound
	}
	return err
66 67
}

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {

	// we can use multiple iterators concurrently. see:
	// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
	// advance the iterator only if the reader reads
	//
	// run query in own sub-process tied to Results.Process(), so that
	// it waits for us to finish AND so that clients can signal to us
	// that resources should be reclaimed.
	qrb := dsq.NewResultBuilder(q)
	qrb.Process.Go(func(worker goprocess.Process) {
		d.runQuery(worker, qrb)
	})

	// go wait on the worker (without signaling close)
	go qrb.Process.CloseAfterChildren()

	// Now, apply remaining things (filters, order)
	qr := qrb.Results()
	for _, f := range q.Filters {
		qr = dsq.NaiveFilter(qr, f)
89
	}
90 91 92 93 94
	for _, o := range q.Orders {
		qr = dsq.NaiveOrder(qr, o)
	}
	return qr, nil
}
95

96
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
97

98 99 100 101 102 103
	var rnge *util.Range
	if qrb.Query.Prefix != "" {
		rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
	}
	i := d.DB.NewIterator(rnge, nil)
	defer i.Release()
104

105 106 107 108
	// advance iterator for offset
	if qrb.Query.Offset > 0 {
		for j := 0; j < qrb.Query.Offset; j++ {
			i.Next()
109
		}
110
	}
111

112 113 114 115 116 117
	// iterate, and handle limit, too
	for sent := 0; i.Next(); sent++ {
		// end early if we hit the limit
		if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
			break
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118

119 120
		k := ds.NewKey(string(i.Key())).String()
		e := dsq.Entry{Key: k}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121

122 123 124 125
		if !qrb.Query.KeysOnly {
			buf := make([]byte, len(i.Value()))
			copy(buf, i.Value())
			e.Value = buf
126
		}
127

128 129 130 131 132 133
		select {
		case qrb.Output <- dsq.Result{Entry: e}: // we sent it out
		case <-worker.Closing(): // client told us to end early.
			break
		}
	}
134

135 136 137 138 139 140 141
	if err := i.Error(); err != nil {
		select {
		case qrb.Output <- dsq.Result{Error: err}: // client read our error
		case <-worker.Closing(): // client told us to end.
			return
		}
	}
Jeromy's avatar
Jeromy committed
142 143
}

Jeromy's avatar
Jeromy committed
144 145 146 147 148
func (d *datastore) Batch() (ds.Batch, error) {
	// TODO: implement batch on leveldb
	return nil, ds.ErrBatchUnsupported
}

149
// LevelDB needs to be closed.
150
func (d *datastore) Close() (err error) {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
151
	return d.DB.Close()
152
}
153

154
func (d *datastore) IsThreadSafe() {}