autobatch.go 2.75 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Package autobatch provides a go-datastore implementation that
// automatically batches together writes by holding puts in memory until
// a certain threshold is met.
package autobatch

import (
	ds "github.com/ipfs/go-datastore"
	dsq "github.com/ipfs/go-datastore/query"
)

// Datastore implements a go-datatsore.
type Datastore struct {
	child ds.Batching

	// TODO: discuss making ds.Batch implement the full ds.Datastore interface
Steven Allen's avatar
Steven Allen committed
16
	buffer           map[ds.Key]op
17 18 19
	maxBufferEntries int
}

Steven Allen's avatar
Steven Allen committed
20 21 22 23 24
type op struct {
	delete bool
	value  []byte
}

25 26 27 28 29 30
// NewAutoBatching returns a new datastore that automatically
// batches writes using the given Batching datastore. The size
// of the memory pool is given by size.
func NewAutoBatching(d ds.Batching, size int) *Datastore {
	return &Datastore{
		child:            d,
Steven Allen's avatar
Steven Allen committed
31
		buffer:           make(map[ds.Key]op, size),
32 33 34 35 36 37
		maxBufferEntries: size,
	}
}

// Delete deletes a key/value
func (d *Datastore) Delete(k ds.Key) error {
Steven Allen's avatar
Steven Allen committed
38 39
	d.buffer[k] = op{delete: true}
	return nil
40 41 42
}

// Get retrieves a value given a key.
43
func (d *Datastore) Get(k ds.Key) ([]byte, error) {
Steven Allen's avatar
Steven Allen committed
44
	o, ok := d.buffer[k]
45
	if ok {
Steven Allen's avatar
Steven Allen committed
46 47 48 49
		if o.delete {
			return nil, ds.ErrNotFound
		}
		return o.value, nil
50 51 52 53 54 55
	}

	return d.child.Get(k)
}

// Put stores a key/value.
56
func (d *Datastore) Put(k ds.Key, val []byte) error {
Steven Allen's avatar
Steven Allen committed
57
	d.buffer[k] = op{value: val}
58 59 60 61 62 63 64 65 66 67 68 69 70
	if len(d.buffer) > d.maxBufferEntries {
		return d.Flush()
	}
	return nil
}

// Flush flushes the current batch to the underlying datastore.
func (d *Datastore) Flush() error {
	b, err := d.child.Batch()
	if err != nil {
		return err
	}

Steven Allen's avatar
Steven Allen committed
71 72 73 74 75 76 77 78 79 80 81
	for k, o := range d.buffer {
		var err error
		if o.delete {
			err = b.Delete(k)
			if err == ds.ErrNotFound {
				// Ignore these, let delete be idempotent.
				err = nil
			}
		} else {
			err = b.Put(k, o.value)
		}
82 83 84 85 86
		if err != nil {
			return err
		}
	}
	// clear out buffer
Steven Allen's avatar
Steven Allen committed
87
	d.buffer = make(map[ds.Key]op, d.maxBufferEntries)
88 89 90 91 92 93

	return b.Commit()
}

// Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) {
Steven Allen's avatar
Steven Allen committed
94
	o, ok := d.buffer[k]
95
	if ok {
Steven Allen's avatar
Steven Allen committed
96
		return !o.delete, nil
97 98 99 100 101
	}

	return d.child.Has(k)
}

Steven Allen's avatar
Steven Allen committed
102 103
// GetSize implements Datastore.GetSize
func (d *Datastore) GetSize(k ds.Key) (int, error) {
Steven Allen's avatar
Steven Allen committed
104
	o, ok := d.buffer[k]
Steven Allen's avatar
Steven Allen committed
105
	if ok {
Steven Allen's avatar
Steven Allen committed
106 107 108 109
		if o.delete {
			return -1, ds.ErrNotFound
		}
		return len(o.value), nil
Steven Allen's avatar
Steven Allen committed
110 111 112 113 114
	}

	return d.child.GetSize(k)
}

115 116 117 118 119 120 121 122 123
// Query performs a query
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
	err := d.Flush()
	if err != nil {
		return nil, err
	}

	return d.child.Query(q)
}
124 125 126 127 128

// DiskUsage implements the PersistentDatastore interface.
func (d *Datastore) DiskUsage() (uint64, error) {
	return ds.DiskUsage(d.child)
}
129 130

func (d *Datastore) Close() error {
Steven Allen's avatar
Steven Allen committed
131 132 133 134 135 136 137 138 139
	err1 := d.Flush()
	err2 := d.child.Close()
	if err1 != nil {
		return err1
	}
	if err2 != nil {
		return err2
	}
	return nil
140
}