Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-ds-leveldb
Commits
8e15c838
Unverified
Commit
8e15c838
authored
Feb 25, 2020
by
Will
Committed by
GitHub
Feb 25, 2020
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #42 from ipfs/bug/race-close
prevent closing concurrently with other operations.
parents
2e5c1979
5d92b4d2
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
75 additions
and
6 deletions
+75
-6
datastore.go
datastore.go
+37
-6
ds_test.go
ds_test.go
+38
-0
No files found.
datastore.go
View file @
8e15c838
...
...
@@ -3,6 +3,7 @@ package leveldb
import
(
"os"
"path/filepath"
"sync"
ds
"github.com/ipfs/go-datastore"
dsq
"github.com/ipfs/go-datastore/query"
...
...
@@ -52,11 +53,12 @@ func NewDatastore(path string, opts *Options) (*Datastore, error) {
return
nil
,
err
}
return
&
Datastore
{
accessor
:
&
accessor
{
ldb
:
db
,
syncWrites
:
true
},
ds
:=
Datastore
{
accessor
:
&
accessor
{
ldb
:
db
,
syncWrites
:
true
,
closeLk
:
new
(
sync
.
RWMutex
)
},
DB
:
db
,
path
:
path
,
},
nil
}
return
&
ds
,
nil
}
// An extraction of the common interface between LevelDB Transactions and the DB itself.
...
...
@@ -74,9 +76,12 @@ type levelDbOps interface {
type
accessor
struct
{
ldb
levelDbOps
syncWrites
bool
closeLk
*
sync
.
RWMutex
}
func
(
a
*
accessor
)
Put
(
key
ds
.
Key
,
value
[]
byte
)
(
err
error
)
{
a
.
closeLk
.
RLock
()
defer
a
.
closeLk
.
RUnlock
()
return
a
.
ldb
.
Put
(
key
.
Bytes
(),
value
,
&
opt
.
WriteOptions
{
Sync
:
a
.
syncWrites
})
}
...
...
@@ -85,6 +90,8 @@ func (a *accessor) Sync(prefix ds.Key) error {
}
func
(
a
*
accessor
)
Get
(
key
ds
.
Key
)
(
value
[]
byte
,
err
error
)
{
a
.
closeLk
.
RLock
()
defer
a
.
closeLk
.
RUnlock
()
val
,
err
:=
a
.
ldb
.
Get
(
key
.
Bytes
(),
nil
)
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
...
...
@@ -96,18 +103,24 @@ func (a *accessor) Get(key ds.Key) (value []byte, err error) {
}
func
(
a
*
accessor
)
Has
(
key
ds
.
Key
)
(
exists
bool
,
err
error
)
{
a
.
closeLk
.
RLock
()
defer
a
.
closeLk
.
RUnlock
()
return
a
.
ldb
.
Has
(
key
.
Bytes
(),
nil
)
}
func
(
d
*
accessor
)
GetSize
(
key
ds
.
Key
)
(
size
int
,
err
error
)
{
return
ds
.
GetBackedSize
(
d
,
key
)
func
(
a
*
accessor
)
GetSize
(
key
ds
.
Key
)
(
size
int
,
err
error
)
{
return
ds
.
GetBackedSize
(
a
,
key
)
}
func
(
a
*
accessor
)
Delete
(
key
ds
.
Key
)
(
err
error
)
{
a
.
closeLk
.
RLock
()
defer
a
.
closeLk
.
RUnlock
()
return
a
.
ldb
.
Delete
(
key
.
Bytes
(),
&
opt
.
WriteOptions
{
Sync
:
a
.
syncWrites
})
}
func
(
a
*
accessor
)
Query
(
q
dsq
.
Query
)
(
dsq
.
Results
,
error
)
{
a
.
closeLk
.
RLock
()
defer
a
.
closeLk
.
RUnlock
()
var
rnge
*
util
.
Range
// make a copy of the query for the fallback naive query implementation.
...
...
@@ -135,6 +148,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
}
r
:=
dsq
.
ResultsFromIterator
(
q
,
dsq
.
Iterator
{
Next
:
func
()
(
dsq
.
Result
,
bool
)
{
a
.
closeLk
.
RLock
()
defer
a
.
closeLk
.
RUnlock
()
if
!
next
()
{
return
dsq
.
Result
{},
false
}
...
...
@@ -149,6 +164,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
return
dsq
.
Result
{
Entry
:
e
},
true
},
Close
:
func
()
error
{
a
.
closeLk
.
RLock
()
defer
a
.
closeLk
.
RUnlock
()
i
.
Release
()
return
nil
},
...
...
@@ -159,6 +176,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
// DiskUsage returns the current disk size used by this levelDB.
// For in-mem datastores, it will return 0.
func
(
d
*
Datastore
)
DiskUsage
()
(
uint64
,
error
)
{
d
.
closeLk
.
RLock
()
defer
d
.
closeLk
.
RUnlock
()
if
d
.
path
==
""
{
// in-mem
return
0
,
nil
}
...
...
@@ -182,12 +201,15 @@ func (d *Datastore) DiskUsage() (uint64, error) {
// LevelDB needs to be closed.
func
(
d
*
Datastore
)
Close
()
(
err
error
)
{
d
.
closeLk
.
Lock
()
defer
d
.
closeLk
.
Unlock
()
return
d
.
DB
.
Close
()
}
type
leveldbBatch
struct
{
b
*
leveldb
.
Batch
db
*
leveldb
.
DB
closeLk
*
sync
.
RWMutex
syncWrites
bool
}
...
...
@@ -195,6 +217,7 @@ func (d *Datastore) Batch() (ds.Batch, error) {
return
&
leveldbBatch
{
b
:
new
(
leveldb
.
Batch
),
db
:
d
.
DB
,
closeLk
:
d
.
closeLk
,
syncWrites
:
d
.
syncWrites
,
},
nil
}
...
...
@@ -205,6 +228,8 @@ func (b *leveldbBatch) Put(key ds.Key, value []byte) error {
}
func
(
b
*
leveldbBatch
)
Commit
()
error
{
b
.
closeLk
.
RLock
()
defer
b
.
closeLk
.
RUnlock
()
return
b
.
db
.
Write
(
b
.
b
,
&
opt
.
WriteOptions
{
Sync
:
b
.
syncWrites
})
}
...
...
@@ -220,18 +245,24 @@ type transaction struct {
}
func
(
t
*
transaction
)
Commit
()
error
{
t
.
closeLk
.
RLock
()
defer
t
.
closeLk
.
RUnlock
()
return
t
.
tx
.
Commit
()
}
func
(
t
*
transaction
)
Discard
()
{
t
.
closeLk
.
RLock
()
defer
t
.
closeLk
.
RUnlock
()
t
.
tx
.
Discard
()
}
func
(
d
*
Datastore
)
NewTransaction
(
readOnly
bool
)
(
ds
.
Txn
,
error
)
{
d
.
closeLk
.
RLock
()
defer
d
.
closeLk
.
RUnlock
()
tx
,
err
:=
d
.
DB
.
OpenTransaction
()
if
err
!=
nil
{
return
nil
,
err
}
accessor
:=
&
accessor
{
ldb
:
tx
,
syncWrites
:
false
}
accessor
:=
&
accessor
{
ldb
:
tx
,
syncWrites
:
false
,
closeLk
:
d
.
closeLk
}
return
&
transaction
{
accessor
,
tx
},
nil
}
ds_test.go
View file @
8e15c838
...
...
@@ -147,6 +147,44 @@ func TestQueryRespectsProcess(t *testing.T) {
addTestCases
(
t
,
d
,
testcases
)
}
func
TestCloseRace
(
t
*
testing
.
T
)
{
d
,
close
:=
newDS
(
t
)
for
n
:=
0
;
n
<
100
;
n
++
{
d
.
Put
(
ds
.
NewKey
(
fmt
.
Sprintf
(
"%d"
,
n
)),
[]
byte
(
fmt
.
Sprintf
(
"test%d"
,
n
)))
}
tx
,
_
:=
d
.
NewTransaction
(
false
)
tx
.
Put
(
ds
.
NewKey
(
"txnversion"
),
[]
byte
(
"bump"
))
closeCh
:=
make
(
chan
interface
{})
go
func
()
{
close
()
closeCh
<-
nil
}()
for
k
:=
range
testcases
{
tx
.
Get
(
ds
.
NewKey
(
k
))
}
tx
.
Commit
()
<-
closeCh
}
func
TestCloseSafety
(
t
*
testing
.
T
)
{
d
,
close
:=
newDS
(
t
)
addTestCases
(
t
,
d
,
testcases
)
tx
,
_
:=
d
.
NewTransaction
(
false
)
err
:=
tx
.
Put
(
ds
.
NewKey
(
"test"
),
[]
byte
(
"test"
))
if
err
!=
nil
{
t
.
Error
(
"Failed to put in a txn."
)
}
close
()
err
=
tx
.
Commit
()
if
err
==
nil
{
t
.
Error
(
"committing after close should fail."
)
}
}
func
TestQueryRespectsProcessMem
(
t
*
testing
.
T
)
{
d
:=
newDSMem
(
t
)
addTestCases
(
t
,
d
,
testcases
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment