datastore.go 2.8 KB
Newer Older
1 2 3
package leveldb

import (
4 5
	"io"

6 7
	"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb"
	"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
8 9 10 11
	"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util"

	ds "github.com/jbenet/go-datastore"
	dsq "github.com/jbenet/go-datastore/query"
12 13
)

14 15 16 17 18 19
type Datastore interface {
	ds.ThreadSafeDatastore
	io.Closer
}

type datastore struct {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
20
	DB *leveldb.DB
21 22 23 24
}

type Options opt.Options

25
func NewDatastore(path string, opts *Options) (Datastore, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27 28 29
	var nopts opt.Options
	if opts != nil {
		nopts = opt.Options(*opts)
	}
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
30 31 32 33
	db, err := leveldb.OpenFile(path, &nopts)
	if err != nil {
		return nil, err
	}
34

35
	return &datastore{
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
36 37
		DB: db,
	}, nil
38 39 40 41 42 43
}

// Returns ErrInvalidType if value is not of type []byte.
//
// Note: using sync = false.
// see http://godoc.org/github.com/syndtr/goleveldb/leveldb/opt#WriteOptions
44
func (d *datastore) Put(key ds.Key, value interface{}) (err error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45 46
	val, ok := value.([]byte)
	if !ok {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
47 48 49
		return ds.ErrInvalidType
	}
	return d.DB.Put(key.Bytes(), val, nil)
50 51
}

52
func (d *datastore) Get(key ds.Key) (value interface{}, err error) {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
53 54 55 56 57 58 59 60
	val, err := d.DB.Get(key.Bytes(), nil)
	if err != nil {
		if err == leveldb.ErrNotFound {
			return nil, ds.ErrNotFound
		}
		return nil, err
	}
	return val, nil
61 62
}

63
func (d *datastore) Has(key ds.Key) (exists bool, err error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64
	return d.DB.Has(key.Bytes(), nil)
65 66
}

67
func (d *datastore) Delete(key ds.Key) (err error) {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
68 69 70 71 72
	err = d.DB.Delete(key.Bytes(), nil)
	if err == leveldb.ErrNotFound {
		return ds.ErrNotFound
	}
	return err
73 74
}

75 76 77 78 79 80 81
func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) {
	var rnge *util.Range
	if q.Prefix != "" {
		rnge = util.BytesPrefix([]byte(q.Prefix))
	}
	i := d.DB.NewIterator(rnge, nil)

82 83 84 85 86 87 88 89 90 91 92 93 94 95
	// buffer this channel so that we dont totally block leveldb if client
	// is not reading from chan.
	ch := make(chan dsq.Entry, 1000)
	qr := dsq.ResultsWithEntriesChan(q, ch)
	// qr := dsq.ResultsWithEntries(q, es)

	go func() {
		defer close(ch)

		// offset
		if q.Offset > 0 {
			for j := 0; j < q.Offset; j++ {
				i.Next()
			}
96 97
		}

98 99
		sent := 0
		for i.Next() {
100

101 102 103 104
			// limit
			if q.Limit > 0 && sent >= q.Limit {
				break
			}
105

106 107
			k := ds.NewKey(string(i.Key())).String()
			e := dsq.Entry{Key: k}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108

109 110 111 112 113
			if !q.KeysOnly {
				buf := make([]byte, len(i.Value()))
				copy(buf, i.Value())
				e.Value = buf
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114

115 116 117 118 119 120 121 122
			ch <- e
			sent++
		}
		i.Release()
		if err := i.Error(); err != nil {
			qr.Err() <- err
		}
	}()
123

124 125 126 127
	// Now, apply remaining pieces.
	q2 := q
	q2.Offset = 0 // already applied
	q2.Limit = 0  // already applied
128

129 130
	qr = q2.ApplyTo(qr)
	qr.Query = q // set it back
131
	return qr, nil
Jeromy's avatar
Jeromy committed
132 133
}

134
// LevelDB needs to be closed.
135
func (d *datastore) Close() (err error) {
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
136
	return d.DB.Close()
137
}
138

139
func (d *datastore) IsThreadSafe() {}