Commit e4b84bef authored by Jeromy's avatar Jeromy

add in support for batched writes

parent 245a981a
......@@ -63,6 +63,10 @@ func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) {
return r, nil
}
func (d *MapDatastore) StartBatchOp() Transaction {
return newBasicTransaction(d)
}
// NullDatastore stores nothing, but conforms to the API.
// Useful to test with.
type NullDatastore struct {
......@@ -98,6 +102,10 @@ func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) {
return dsq.ResultsWithEntries(q, nil), nil
}
func (d *NullDatastore) StartBatchOp() Transaction {
return newBasicTransaction(d)
}
// LogDatastore logs all accesses through the datastore.
type LogDatastore struct {
Name string
......@@ -154,3 +162,8 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) {
log.Printf("%s: Query\n", d.Name)
return d.child.Query(q)
}
func (d *LogDatastore) StartBatchOp() Transaction {
log.Printf("%s: StartBatchOp\n", d.Name)
return d.child.StartBatchOp()
}
......@@ -67,6 +67,9 @@ type Datastore interface {
// result.AllEntries()
//
Query(q query.Query) (query.Results, error)
// StartBatchOp begins a datastore transaction
StartBatchOp() Transaction
}
// ThreadSafeDatastore is an interface that all threadsafe datastore should
......@@ -104,3 +107,11 @@ func GetBackedHas(ds Datastore, key Key) (bool, error) {
return false, err
}
}
type Transaction interface {
Put(key Key, val interface{}) error
Delete(key Key) error
Commit() error
}
......@@ -68,12 +68,8 @@ func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) {
}
func (fs *Datastore) makePrefixDir(dir string) error {
if err := os.Mkdir(dir, 0777); err != nil {
// EEXIST is safe to ignore here, that just means the prefix
// directory already existed.
if !os.IsExist(err) {
return err
}
if err := fs.makePrefixDirNoSync(dir); err != nil {
return err
}
// In theory, if we create a new prefix dir and add a file to
......@@ -86,6 +82,17 @@ func (fs *Datastore) makePrefixDir(dir string) error {
return nil
}
func (fs *Datastore) makePrefixDirNoSync(dir string) error {
if err := os.Mkdir(dir, 0777); err != nil {
// EEXIST is safe to ignore here, that just means the prefix
// directory already existed.
if !os.IsExist(err) {
return err
}
}
return nil
}
func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
val, ok := value.([]byte)
if !ok {
......@@ -137,6 +144,88 @@ func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
return nil
}
func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
var dirsToSync []string
files := make(map[*os.File]string)
for key, value := range data {
val, ok := value.([]byte)
if !ok {
return datastore.ErrInvalidType
}
dir, path := fs.encode(key)
if err := fs.makePrefixDirNoSync(dir); err != nil {
return err
}
dirsToSync = append(dirsToSync, dir)
tmp, err := ioutil.TempFile(dir, "put-")
if err != nil {
return err
}
if _, err := tmp.Write(val); err != nil {
return err
}
files[tmp] = path
}
ops := make(map[*os.File]int)
defer func() {
for fi, _ := range files {
val, _ := ops[fi]
switch val {
case 0:
_ = fi.Close()
fallthrough
case 1:
_ = os.Remove(fi.Name())
}
}
}()
// Now we sync everything
// sync and close files
for fi, _ := range files {
if err := fi.Sync(); err != nil {
return err
}
if err := fi.Close(); err != nil {
return err
}
// signify closed
ops[fi] = 1
}
// move files to their proper places
for fi, path := range files {
if err := osrename.Rename(fi.Name(), path); err != nil {
return err
}
// signify removed
ops[fi] = 2
}
// now sync the dirs for those files
for _, dir := range dirsToSync {
if err := syncDir(dir); err != nil {
return err
}
}
// sync top flatfs dir
if err := syncDir(fs.path); err != nil {
return err
}
return nil
}
func (fs *Datastore) Get(key datastore.Key) (value interface{}, err error) {
_, path := fs.encode(key)
data, err := ioutil.ReadFile(path)
......@@ -234,6 +323,51 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E
return res, nil
}
type flatfsTransaction struct {
puts map[datastore.Key]interface{}
gets map[datastore.Key]datastore.GetCallback
deletes map[datastore.Key]struct{}
ds *Datastore
}
func (fs *Datastore) StartBatchOp() datastore.Transaction {
return &flatfsTransaction{
puts: make(map[datastore.Key]interface{}),
gets: make(map[datastore.Key]datastore.GetCallback),
deletes: make(map[datastore.Key]struct{}),
ds: fs,
}
}
func (bt *flatfsTransaction) Put(key datastore.Key, val interface{}) error {
bt.puts[key] = val
return nil
}
func (bt *flatfsTransaction) Delete(key datastore.Key) error {
bt.deletes[key] = struct{}{}
return nil
}
func (bt *flatfsTransaction) Commit() error {
for k, cb := range bt.gets {
cb(bt.ds.Get(k))
}
if err := bt.ds.putMany(bt.puts); err != nil {
return err
}
for k, _ := range bt.deletes {
if err := bt.ds.Delete(k); err != nil {
return err
}
}
return nil
}
var _ datastore.ThreadSafeDatastore = (*Datastore)(nil)
func (*Datastore) IsThreadSafe() {}
......@@ -100,6 +100,40 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return qr, nil
}
type ldbBatch struct {
b *leveldb.Batch
d *datastore
}
func (d *datastore) StartBatchOp() ds.Transaction {
return &ldbBatch{
b: new(leveldb.Batch),
d: d,
}
}
func (b *ldbBatch) Put(key ds.Key, val interface{}) error {
v, ok := val.([]byte)
if !ok {
return ds.ErrInvalidType
}
b.b.Put(key.Bytes(), v) // #dealwithit
return nil
}
func (b *ldbBatch) Delete(key ds.Key) error {
b.b.Delete(key.Bytes())
return nil
}
func (b *ldbBatch) Commit() error {
opts := &opt.WriteOptions{Sync: true}
if err := b.d.DB.Write(b.b, opts); err != nil {
return err
}
return nil
}
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
var rnge *util.Range
......
package datastore
type GetCallback func(interface{}, error)
// basicTransaction implements the transaction interface for datastores who do
// not have any sort of underlying transactional support
type basicTransaction struct {
puts map[Key]interface{}
gets map[Key]GetCallback
deletes map[Key]struct{}
target Datastore
}
func newBasicTransaction(ds Datastore) Transaction {
return &basicTransaction{
puts: make(map[Key]interface{}),
gets: make(map[Key]GetCallback),
deletes: make(map[Key]struct{}),
target: ds,
}
}
func (bt *basicTransaction) Put(key Key, val interface{}) error {
bt.puts[key] = val
return nil
}
func (bt *basicTransaction) Delete(key Key) error {
bt.deletes[key] = struct{}{}
return nil
}
func (bt *basicTransaction) Commit() error {
for k, cb := range bt.gets {
cb(bt.target.Get(k))
}
for k, val := range bt.puts {
if err := bt.target.Put(k, val); err != nil {
return err
}
}
for k, _ := range bt.deletes {
if err := bt.target.Delete(k); err != nil {
return err
}
}
return nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment