Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-datastore
Commits
60c74efc
Commit
60c74efc
authored
Jun 26, 2015
by
Juan Batiz-Benet
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Revert "add in support for batched writes"
parent
e9a2ec20
Changes
16
Hide whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
6 additions
and
472 deletions
+6
-472
basic_ds.go
basic_ds.go
+0
-13
callback/callback.go
callback/callback.go
+0
-5
coalesce/coalesce.go
coalesce/coalesce.go
+0
-4
datastore.go
datastore.go
+0
-11
flatfs/flatfs.go
flatfs/flatfs.go
+6
-134
fs/fs.go
fs/fs.go
+0
-6
keytransform/keytransform.go
keytransform/keytransform.go
+0
-25
leveldb/datastore.go
leveldb/datastore.go
+0
-34
lru/datastore.go
lru/datastore.go
+0
-4
measure/measure.go
measure/measure.go
+0
-61
mount/mount.go
mount/mount.go
+0
-45
panic/panic.go
panic/panic.go
+0
-35
sync/sync.go
sync/sync.go
+0
-6
tiered/tiered.go
tiered/tiered.go
+0
-40
timecache/timecache.go
timecache/timecache.go
+0
-5
transaction.go
transaction.go
+0
-44
No files found.
basic_ds.go
View file @
60c74efc
...
...
@@ -63,10 +63,6 @@ func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) {
return
r
,
nil
}
func
(
d
*
MapDatastore
)
Batch
()
Batch
{
return
NewBasicBatch
(
d
)
}
// NullDatastore stores nothing, but conforms to the API.
// Useful to test with.
type
NullDatastore
struct
{
...
...
@@ -102,10 +98,6 @@ func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) {
return
dsq
.
ResultsWithEntries
(
q
,
nil
),
nil
}
func
(
d
*
NullDatastore
)
Batch
()
Batch
{
return
NewBasicBatch
(
d
)
}
// LogDatastore logs all accesses through the datastore.
type
LogDatastore
struct
{
Name
string
...
...
@@ -162,8 +154,3 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) {
log
.
Printf
(
"%s: Query
\n
"
,
d
.
Name
)
return
d
.
child
.
Query
(
q
)
}
func
(
d
*
LogDatastore
)
Batch
()
Batch
{
log
.
Printf
(
"%s: Batch
\n
"
,
d
.
Name
)
return
d
.
child
.
Batch
()
}
callback/callback.go
View file @
60c74efc
...
...
@@ -40,8 +40,3 @@ func (c *Datastore) Query(q dsq.Query) (dsq.Results, error) {
c
.
F
()
return
c
.
D
.
Query
(
q
)
}
func
(
c
*
Datastore
)
Batch
()
ds
.
Batch
{
c
.
F
()
return
c
.
D
.
Batch
()
}
coalesce/coalesce.go
View file @
60c74efc
...
...
@@ -124,7 +124,3 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// query not coalesced yet.
return
d
.
child
.
Query
(
q
)
}
func
(
d
*
datastore
)
Batch
()
ds
.
Batch
{
return
ds
.
NewBasicBatch
(
d
)
}
datastore.go
View file @
60c74efc
...
...
@@ -67,9 +67,6 @@ type Datastore interface {
// result.AllEntries()
//
Query
(
q
query
.
Query
)
(
query
.
Results
,
error
)
// Batch begins a datastore transaction
Batch
()
Batch
}
// ThreadSafeDatastore is an interface that all threadsafe datastore should
...
...
@@ -107,11 +104,3 @@ func GetBackedHas(ds Datastore, key Key) (bool, error) {
return
false
,
err
}
}
type
Batch
interface
{
Put
(
key
Key
,
val
interface
{})
error
Delete
(
key
Key
)
error
Commit
()
error
}
flatfs/flatfs.go
View file @
60c74efc
...
...
@@ -68,8 +68,12 @@ func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) {
}
func
(
fs
*
Datastore
)
makePrefixDir
(
dir
string
)
error
{
if
err
:=
fs
.
makePrefixDirNoSync
(
dir
);
err
!=
nil
{
return
err
if
err
:=
os
.
Mkdir
(
dir
,
0777
);
err
!=
nil
{
// EEXIST is safe to ignore here, that just means the prefix
// directory already existed.
if
!
os
.
IsExist
(
err
)
{
return
err
}
}
// In theory, if we create a new prefix dir and add a file to
...
...
@@ -82,17 +86,6 @@ func (fs *Datastore) makePrefixDir(dir string) error {
return
nil
}
func
(
fs
*
Datastore
)
makePrefixDirNoSync
(
dir
string
)
error
{
if
err
:=
os
.
Mkdir
(
dir
,
0777
);
err
!=
nil
{
// EEXIST is safe to ignore here, that just means the prefix
// directory already existed.
if
!
os
.
IsExist
(
err
)
{
return
err
}
}
return
nil
}
func
(
fs
*
Datastore
)
Put
(
key
datastore
.
Key
,
value
interface
{})
error
{
val
,
ok
:=
value
.
([]
byte
)
if
!
ok
{
...
...
@@ -144,88 +137,6 @@ func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
return
nil
}
func
(
fs
*
Datastore
)
putMany
(
data
map
[
datastore
.
Key
]
interface
{})
error
{
var
dirsToSync
[]
string
files
:=
make
(
map
[
*
os
.
File
]
string
)
for
key
,
value
:=
range
data
{
val
,
ok
:=
value
.
([]
byte
)
if
!
ok
{
return
datastore
.
ErrInvalidType
}
dir
,
path
:=
fs
.
encode
(
key
)
if
err
:=
fs
.
makePrefixDirNoSync
(
dir
);
err
!=
nil
{
return
err
}
dirsToSync
=
append
(
dirsToSync
,
dir
)
tmp
,
err
:=
ioutil
.
TempFile
(
dir
,
"put-"
)
if
err
!=
nil
{
return
err
}
if
_
,
err
:=
tmp
.
Write
(
val
);
err
!=
nil
{
return
err
}
files
[
tmp
]
=
path
}
ops
:=
make
(
map
[
*
os
.
File
]
int
)
defer
func
()
{
for
fi
,
_
:=
range
files
{
val
,
_
:=
ops
[
fi
]
switch
val
{
case
0
:
_
=
fi
.
Close
()
fallthrough
case
1
:
_
=
os
.
Remove
(
fi
.
Name
())
}
}
}()
// Now we sync everything
// sync and close files
for
fi
,
_
:=
range
files
{
if
err
:=
fi
.
Sync
();
err
!=
nil
{
return
err
}
if
err
:=
fi
.
Close
();
err
!=
nil
{
return
err
}
// signify closed
ops
[
fi
]
=
1
}
// move files to their proper places
for
fi
,
path
:=
range
files
{
if
err
:=
osrename
.
Rename
(
fi
.
Name
(),
path
);
err
!=
nil
{
return
err
}
// signify removed
ops
[
fi
]
=
2
}
// now sync the dirs for those files
for
_
,
dir
:=
range
dirsToSync
{
if
err
:=
syncDir
(
dir
);
err
!=
nil
{
return
err
}
}
// sync top flatfs dir
if
err
:=
syncDir
(
fs
.
path
);
err
!=
nil
{
return
err
}
return
nil
}
func
(
fs
*
Datastore
)
Get
(
key
datastore
.
Key
)
(
value
interface
{},
err
error
)
{
_
,
path
:=
fs
.
encode
(
key
)
data
,
err
:=
ioutil
.
ReadFile
(
path
)
...
...
@@ -323,45 +234,6 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E
return
res
,
nil
}
type
flatfsBatch
struct
{
puts
map
[
datastore
.
Key
]
interface
{}
deletes
map
[
datastore
.
Key
]
struct
{}
ds
*
Datastore
}
func
(
fs
*
Datastore
)
Batch
()
datastore
.
Batch
{
return
&
flatfsBatch
{
puts
:
make
(
map
[
datastore
.
Key
]
interface
{}),
deletes
:
make
(
map
[
datastore
.
Key
]
struct
{}),
ds
:
fs
,
}
}
func
(
bt
*
flatfsBatch
)
Put
(
key
datastore
.
Key
,
val
interface
{})
error
{
bt
.
puts
[
key
]
=
val
return
nil
}
func
(
bt
*
flatfsBatch
)
Delete
(
key
datastore
.
Key
)
error
{
bt
.
deletes
[
key
]
=
struct
{}{}
return
nil
}
func
(
bt
*
flatfsBatch
)
Commit
()
error
{
if
err
:=
bt
.
ds
.
putMany
(
bt
.
puts
);
err
!=
nil
{
return
err
}
for
k
,
_
:=
range
bt
.
deletes
{
if
err
:=
bt
.
ds
.
Delete
(
k
);
err
!=
nil
{
return
err
}
}
return
nil
}
var
_
datastore
.
ThreadSafeDatastore
=
(
*
Datastore
)(
nil
)
func
(
*
Datastore
)
IsThreadSafe
()
{}
fs/fs.go
View file @
60c74efc
...
...
@@ -130,12 +130,6 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
return
r
,
nil
}
func
(
d
*
Datastore
)
Batch
()
ds
.
Batch
{
// just use basic transaction for now, this datastore
// isnt really used in performant code yet
return
ds
.
NewBasicBatch
(
d
)
}
// isDir returns whether given path is a directory
func
isDir
(
path
string
)
bool
{
finfo
,
err
:=
os
.
Stat
(
path
)
...
...
keytransform/keytransform.go
View file @
60c74efc
...
...
@@ -73,28 +73,3 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return
dsq
.
DerivedResults
(
qr
,
ch
),
nil
}
func
(
d
*
ktds
)
Batch
()
ds
.
Batch
{
return
&
transformBatch
{
dst
:
d
.
child
.
Batch
(),
f
:
d
.
ConvertKey
,
}
}
type
transformBatch
struct
{
dst
ds
.
Batch
f
KeyMapping
}
func
(
t
*
transformBatch
)
Put
(
key
ds
.
Key
,
val
interface
{})
error
{
return
t
.
dst
.
Put
(
t
.
f
(
key
),
val
)
}
func
(
t
*
transformBatch
)
Delete
(
key
ds
.
Key
)
error
{
return
t
.
dst
.
Delete
(
t
.
f
(
key
))
}
func
(
t
*
transformBatch
)
Commit
()
error
{
return
t
.
dst
.
Commit
()
}
leveldb/datastore.go
View file @
60c74efc
...
...
@@ -100,40 +100,6 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return
qr
,
nil
}
type
ldbBatch
struct
{
b
*
leveldb
.
Batch
d
*
datastore
}
func
(
d
*
datastore
)
Batch
()
ds
.
Batch
{
return
&
ldbBatch
{
b
:
new
(
leveldb
.
Batch
),
d
:
d
,
}
}
func
(
b
*
ldbBatch
)
Put
(
key
ds
.
Key
,
val
interface
{})
error
{
v
,
ok
:=
val
.
([]
byte
)
if
!
ok
{
return
ds
.
ErrInvalidType
}
b
.
b
.
Put
(
key
.
Bytes
(),
v
)
// #dealwithit
return
nil
}
func
(
b
*
ldbBatch
)
Delete
(
key
ds
.
Key
)
error
{
b
.
b
.
Delete
(
key
.
Bytes
())
return
nil
}
func
(
b
*
ldbBatch
)
Commit
()
error
{
opts
:=
&
opt
.
WriteOptions
{
Sync
:
true
}
if
err
:=
b
.
d
.
DB
.
Write
(
b
.
b
,
opts
);
err
!=
nil
{
return
err
}
return
nil
}
func
(
d
*
datastore
)
runQuery
(
worker
goprocess
.
Process
,
qrb
*
dsq
.
ResultBuilder
)
{
var
rnge
*
util
.
Range
...
...
lru/datastore.go
View file @
60c74efc
...
...
@@ -54,7 +54,3 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
func
(
d
*
Datastore
)
Query
(
q
dsq
.
Query
)
(
dsq
.
Results
,
error
)
{
return
nil
,
errors
.
New
(
"KeyList not implemented."
)
}
func
(
d
*
Datastore
)
Batch
()
ds
.
Batch
{
return
ds
.
NewBasicBatch
(
d
)
}
measure/measure.go
View file @
60c74efc
...
...
@@ -148,67 +148,6 @@ func (m *measure) Query(q query.Query) (query.Results, error) {
return
res
,
err
}
type
measuredBatch
struct
{
puts
int
deletes
int
putts
datastore
.
Batch
delts
datastore
.
Batch
m
*
measure
}
func
(
m
*
measure
)
Batch
()
datastore
.
Batch
{
return
&
measuredBatch
{
putts
:
m
.
backend
.
Batch
(),
delts
:
m
.
backend
.
Batch
(),
m
:
m
,
}
}
func
(
mt
*
measuredBatch
)
Put
(
key
datastore
.
Key
,
val
interface
{})
error
{
mt
.
puts
++
return
mt
.
putts
.
Put
(
key
,
val
)
}
func
(
mt
*
measuredBatch
)
Delete
(
key
datastore
.
Key
)
error
{
mt
.
deletes
++
return
mt
.
delts
.
Delete
(
key
)
}
func
(
mt
*
measuredBatch
)
Commit
()
error
{
if
mt
.
deletes
>
0
{
before
:=
time
.
Now
()
err
:=
mt
.
delts
.
Commit
()
took
:=
int
(
time
.
Now
()
.
Sub
(
before
)
/
time
.
Microsecond
)
/
mt
.
deletes
mt
.
m
.
deleteNum
.
AddN
(
uint64
(
mt
.
deletes
))
for
i
:=
0
;
i
<
mt
.
deletes
;
i
++
{
mt
.
m
.
deleteLatency
.
RecordValue
(
int64
(
took
))
}
if
err
!=
nil
{
mt
.
m
.
deleteErr
.
Add
()
return
err
}
}
if
mt
.
puts
>
0
{
before
:=
time
.
Now
()
err
:=
mt
.
putts
.
Commit
()
took
:=
int
(
time
.
Now
()
.
Sub
(
before
)
/
time
.
Microsecond
)
/
mt
.
puts
mt
.
m
.
putNum
.
AddN
(
uint64
(
mt
.
puts
))
for
i
:=
0
;
i
<
mt
.
puts
;
i
++
{
mt
.
m
.
putLatency
.
RecordValue
(
int64
(
took
))
}
if
err
!=
nil
{
mt
.
m
.
putErr
.
Add
()
return
err
}
}
return
nil
}
func
(
m
*
measure
)
Close
()
error
{
m
.
putNum
.
Remove
()
m
.
putErr
.
Remove
()
...
...
mount/mount.go
View file @
60c74efc
...
...
@@ -114,48 +114,3 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
r
=
query
.
ResultsReplaceQuery
(
r
,
q
)
return
r
,
nil
}
type
mountBatch
struct
{
mounts
map
[
string
]
datastore
.
Batch
d
*
Datastore
}
func
(
d
*
Datastore
)
Batch
()
datastore
.
Batch
{
return
&
mountBatch
{
mounts
:
make
(
map
[
string
]
datastore
.
Batch
),
d
:
d
,
}
}
func
(
mt
*
mountBatch
)
Put
(
key
datastore
.
Key
,
val
interface
{})
error
{
child
,
loc
,
rest
:=
mt
.
d
.
lookup
(
key
)
t
,
ok
:=
mt
.
mounts
[
loc
.
String
()]
if
!
ok
{
t
=
child
.
Batch
()
mt
.
mounts
[
loc
.
String
()]
=
t
}
return
t
.
Put
(
rest
,
val
)
}
func
(
mt
*
mountBatch
)
Delete
(
key
datastore
.
Key
)
error
{
child
,
loc
,
rest
:=
mt
.
d
.
lookup
(
key
)
t
,
ok
:=
mt
.
mounts
[
loc
.
String
()]
if
!
ok
{
t
=
child
.
Batch
()
mt
.
mounts
[
loc
.
String
()]
=
t
}
return
t
.
Delete
(
rest
)
}
func
(
mt
*
mountBatch
)
Commit
()
error
{
for
_
,
t
:=
range
mt
.
mounts
{
err
:=
t
.
Commit
()
if
err
!=
nil
{
return
err
}
}
return
nil
}
panic/panic.go
View file @
60c74efc
...
...
@@ -66,38 +66,3 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
}
return
r
,
nil
}
type
panicBatch
struct
{
t
ds
.
Batch
}
func
(
p
*
panicBatch
)
Put
(
key
ds
.
Key
,
val
interface
{})
error
{
err
:=
p
.
t
.
Put
(
key
,
val
)
if
err
!=
nil
{
fmt
.
Fprintf
(
os
.
Stdout
,
"panic datastore: %s"
,
err
)
panic
(
"panic datastore: transaction put failed"
)
}
return
nil
}
func
(
p
*
panicBatch
)
Delete
(
key
ds
.
Key
)
error
{
err
:=
p
.
t
.
Delete
(
key
)
if
err
!=
nil
{
fmt
.
Fprintf
(
os
.
Stdout
,
"panic datastore: %s"
,
err
)
panic
(
"panic datastore: transaction delete failed"
)
}
return
nil
}
func
(
p
*
panicBatch
)
Commit
()
error
{
err
:=
p
.
t
.
Commit
()
if
err
!=
nil
{
fmt
.
Fprintf
(
os
.
Stdout
,
"panic datastore: %s"
,
err
)
panic
(
"panic datastore: transaction commit failed"
)
}
return
nil
}
func
(
d
*
datastore
)
Batch
()
ds
.
Batch
{
return
&
panicBatch
{
d
.
child
.
Batch
()}
}
sync/sync.go
View file @
60c74efc
...
...
@@ -63,9 +63,3 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
defer
d
.
RUnlock
()
return
d
.
child
.
Query
(
q
)
}
func
(
d
*
MutexDatastore
)
Batch
()
ds
.
Batch
{
d
.
RLock
()
defer
d
.
RUnlock
()
return
d
.
child
.
Batch
()
}
tiered/tiered.go
View file @
60c74efc
...
...
@@ -92,43 +92,3 @@ func (d tiered) Query(q dsq.Query) (dsq.Results, error) {
// query always the last (most complete) one
return
d
[
len
(
d
)
-
1
]
.
Query
(
q
)
}
type
tieredBatch
[]
ds
.
Batch
func
(
d
tiered
)
Batch
()
ds
.
Batch
{
var
out
tieredBatch
for
_
,
ds
:=
range
d
{
out
=
append
(
out
,
ds
.
Batch
())
}
return
out
}
func
(
t
tieredBatch
)
Put
(
key
ds
.
Key
,
val
interface
{})
error
{
for
_
,
ts
:=
range
t
{
err
:=
ts
.
Put
(
key
,
val
)
if
err
!=
nil
{
return
err
}
}
return
nil
}
func
(
t
tieredBatch
)
Delete
(
key
ds
.
Key
)
error
{
for
_
,
ts
:=
range
t
{
err
:=
ts
.
Delete
(
key
)
if
err
!=
nil
{
return
err
}
}
return
nil
}
func
(
t
tieredBatch
)
Commit
()
error
{
for
_
,
ts
:=
range
t
{
err
:=
ts
.
Commit
()
if
err
!=
nil
{
return
err
}
}
return
nil
}
timecache/timecache.go
View file @
60c74efc
...
...
@@ -94,8 +94,3 @@ func (d *datastore) Delete(key ds.Key) (err error) {
func
(
d
*
datastore
)
Query
(
q
dsq
.
Query
)
(
dsq
.
Results
,
error
)
{
return
d
.
cache
.
Query
(
q
)
}
func
(
d
*
datastore
)
Batch
()
ds
.
Batch
{
// sorry, being lazy here
return
ds
.
NewBasicBatch
(
d
)
}
transaction.go
deleted
100644 → 0
View file @
e9a2ec20
package
datastore
// basicBatch implements the transaction interface for datastores who do
// not have any sort of underlying transactional support
type
basicBatch
struct
{
puts
map
[
Key
]
interface
{}
deletes
map
[
Key
]
struct
{}
target
Datastore
}
func
NewBasicBatch
(
ds
Datastore
)
Batch
{
return
&
basicBatch
{
puts
:
make
(
map
[
Key
]
interface
{}),
deletes
:
make
(
map
[
Key
]
struct
{}),
target
:
ds
,
}
}
func
(
bt
*
basicBatch
)
Put
(
key
Key
,
val
interface
{})
error
{
bt
.
puts
[
key
]
=
val
return
nil
}
func
(
bt
*
basicBatch
)
Delete
(
key
Key
)
error
{
bt
.
deletes
[
key
]
=
struct
{}{}
return
nil
}
func
(
bt
*
basicBatch
)
Commit
()
error
{
for
k
,
val
:=
range
bt
.
puts
{
if
err
:=
bt
.
target
.
Put
(
k
,
val
);
err
!=
nil
{
return
err
}
}
for
k
,
_
:=
range
bt
.
deletes
{
if
err
:=
bt
.
target
.
Delete
(
k
);
err
!=
nil
{
return
err
}
}
return
nil
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment