Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-ds-flatfs
Commits
4418505d
Unverified
Commit
4418505d
authored
Mar 01, 2019
by
Steven Allen
Committed by
GitHub
Mar 01, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #52 from ipfs/fix/write-after-close
fix panic on write after close
parents
da94b4c7
6fb155e5
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
91 additions
and
41 deletions
+91
-41
convert.go
convert.go
+2
-2
flatfs.go
flatfs.go
+65
-39
flatfs_test.go
flatfs_test.go
+24
-0
No files found.
convert.go
View file @
4418505d
...
...
@@ -170,8 +170,8 @@ func Move(oldPath string, newPath string, out io.Writer) error {
func
moveKey
(
oldDS
*
Datastore
,
newDS
*
Datastore
,
key
datastore
.
Key
)
error
{
_
,
oldPath
:=
oldDS
.
encode
(
key
)
dir
,
newPath
:=
newDS
.
encode
(
key
)
err
:=
newDS
.
makeDirNoSync
(
dir
)
if
err
!=
nil
{
err
:=
os
.
Mkdir
(
dir
,
0755
)
if
err
!=
nil
&&
!
os
.
IsExist
(
err
)
{
return
err
}
err
=
os
.
Rename
(
oldPath
,
newPath
)
...
...
flatfs.go
View file @
4418505d
...
...
@@ -94,6 +94,7 @@ var (
ErrDatastoreExists
=
errors
.
New
(
"datastore already exists"
)
ErrDatastoreDoesNotExist
=
errors
.
New
(
"datastore directory does not exist"
)
ErrShardingFileMissing
=
fmt
.
Errorf
(
"%s file not found in datastore"
,
SHARDING_FN
)
ErrClosed
=
errors
.
New
(
"datastore closed"
)
)
func
init
()
{
...
...
@@ -123,9 +124,13 @@ type Datastore struct {
dirty
bool
storedValue
diskUsageValue
// Used to trigger a checkpoint.
checkpointCh
chan
struct
{}
done
chan
struct
{}
shutdownLock
sync
.
RWMutex
shutdown
bool
// opMap handles concurrent write operations (put/delete)
// to the same key
opMap
*
opMap
...
...
@@ -238,12 +243,14 @@ func Open(path string, syncFiles bool) (*Datastore, error) {
}
fs
:=
&
Datastore
{
path
:
path
,
shardStr
:
shardId
.
String
(),
getDir
:
shardId
.
Func
(),
sync
:
syncFiles
,
diskUsage
:
0
,
opMap
:
new
(
opMap
),
path
:
path
,
shardStr
:
shardId
.
String
(),
getDir
:
shardId
.
Func
(),
sync
:
syncFiles
,
checkpointCh
:
make
(
chan
struct
{},
1
),
done
:
make
(
chan
struct
{}),
diskUsage
:
0
,
opMap
:
new
(
opMap
),
}
// This sets diskUsage to the correct value
...
...
@@ -257,8 +264,6 @@ func Open(path string, syncFiles bool) (*Datastore, error) {
return
nil
,
err
}
fs
.
checkpointCh
=
make
(
chan
struct
{},
1
)
fs
.
done
=
make
(
chan
struct
{})
go
fs
.
checkpointLoop
()
return
fs
,
nil
}
...
...
@@ -356,6 +361,12 @@ var putMaxRetries = 6
// concurrent Put and a Delete operation, we cannot guarantee which one
// will win.
func
(
fs
*
Datastore
)
Put
(
key
datastore
.
Key
,
value
[]
byte
)
error
{
fs
.
shutdownLock
.
RLock
()
defer
fs
.
shutdownLock
.
RUnlock
()
if
fs
.
shutdown
{
return
ErrClosed
}
var
err
error
for
i
:=
1
;
i
<=
putMaxRetries
;
i
++
{
err
=
fs
.
doWriteOp
(
&
op
{
...
...
@@ -465,15 +476,32 @@ func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
return
nil
}
func
(
fs
*
Datastore
)
putMany
(
data
map
[
datastore
.
Key
]
interface
{})
error
{
func
(
fs
*
Datastore
)
putMany
(
data
map
[
datastore
.
Key
][]
byte
)
error
{
fs
.
shutdownLock
.
RLock
()
defer
fs
.
shutdownLock
.
RUnlock
()
if
fs
.
shutdown
{
return
ErrClosed
}
var
dirsToSync
[]
string
files
:=
make
(
map
[
*
os
.
File
]
*
op
)
for
key
,
value
:=
range
data
{
val
,
ok
:=
value
.
([]
byte
)
if
!
ok
{
return
datastore
.
ErrInvalidType
files
:=
make
(
map
[
*
os
.
File
]
*
op
,
len
(
data
))
ops
:=
make
(
map
[
*
os
.
File
]
int
,
len
(
data
))
defer
func
()
{
for
fi
:=
range
files
{
val
,
_
:=
ops
[
fi
]
switch
val
{
case
0
:
_
=
fi
.
Close
()
fallthrough
case
1
:
_
=
os
.
Remove
(
fi
.
Name
())
}
}
}()
for
key
,
value
:=
range
data
{
dir
,
path
:=
fs
.
encode
(
key
)
if
err
:=
fs
.
makeDirNoSync
(
dir
);
err
!=
nil
{
return
err
...
...
@@ -485,7 +513,7 @@ func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
return
err
}
if
_
,
err
:=
tmp
.
Write
(
val
);
err
!=
nil
{
if
_
,
err
:=
tmp
.
Write
(
val
ue
);
err
!=
nil
{
return
err
}
...
...
@@ -497,24 +525,9 @@ func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
}
}
ops
:=
make
(
map
[
*
os
.
File
]
int
)
defer
func
()
{
for
fi
,
_
:=
range
files
{
val
,
_
:=
ops
[
fi
]
switch
val
{
case
0
:
_
=
fi
.
Close
()
fallthrough
case
1
:
_
=
os
.
Remove
(
fi
.
Name
())
}
}
}()
// Now we sync everything
// sync and close files
for
fi
,
_
:=
range
files
{
for
fi
:=
range
files
{
if
fs
.
sync
{
if
err
:=
syncFile
(
fi
);
err
!=
nil
{
return
err
...
...
@@ -531,7 +544,10 @@ func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
// move files to their proper places
for
fi
,
op
:=
range
files
{
fs
.
doWriteOp
(
op
)
err
:=
fs
.
doWriteOp
(
op
)
if
err
!=
nil
{
return
err
}
// signify removed
ops
[
fi
]
=
2
}
...
...
@@ -594,6 +610,12 @@ func (fs *Datastore) GetSize(key datastore.Key) (size int, err error) {
// the Put() explanation about the handling of concurrent write
// operations to the same key.
func
(
fs
*
Datastore
)
Delete
(
key
datastore
.
Key
)
error
{
fs
.
shutdownLock
.
RLock
()
defer
fs
.
shutdownLock
.
RUnlock
()
if
fs
.
shutdown
{
return
ErrClosed
}
return
fs
.
doWriteOp
(
&
op
{
typ
:
opDelete
,
key
:
key
,
...
...
@@ -845,6 +867,8 @@ func (fs *Datastore) checkpointDiskUsage() {
}
func
(
fs
*
Datastore
)
checkpointLoop
()
{
defer
close
(
fs
.
done
)
timerActive
:=
true
timer
:=
time
.
NewTimer
(
0
)
defer
timer
.
Stop
()
...
...
@@ -858,7 +882,6 @@ func (fs *Datastore) checkpointLoop() {
if
fs
.
dirty
{
log
.
Errorf
(
"could not store final value of disk usage to file, future estimates may be inaccurate"
)
}
fs
.
done
<-
struct
{}{}
return
}
// If the difference between the checkpointed disk usage and
...
...
@@ -1023,11 +1046,14 @@ func (fs *Datastore) walk(path string, result *query.ResultBuilder) error {
// operations will fail but readonly operations will continue to
// function
func
(
fs
*
Datastore
)
deactivate
()
error
{
if
fs
.
checkpointCh
!=
nil
{
close
(
fs
.
checkpointCh
)
<-
fs
.
done
fs
.
checkpointCh
=
nil
fs
.
shutdownLock
.
Lock
()
defer
fs
.
shutdownLock
.
Unlock
(
)
if
fs
.
shutdown
{
return
nil
}
fs
.
shutdown
=
true
close
(
fs
.
checkpointCh
)
<-
fs
.
done
return
nil
}
...
...
@@ -1036,7 +1062,7 @@ func (fs *Datastore) Close() error {
}
type
flatfsBatch
struct
{
puts
map
[
datastore
.
Key
]
interface
{}
puts
map
[
datastore
.
Key
]
[]
byte
deletes
map
[
datastore
.
Key
]
struct
{}
ds
*
Datastore
...
...
@@ -1044,7 +1070,7 @@ type flatfsBatch struct {
func
(
fs
*
Datastore
)
Batch
()
(
datastore
.
Batch
,
error
)
{
return
&
flatfsBatch
{
puts
:
make
(
map
[
datastore
.
Key
]
interface
{}
),
puts
:
make
(
map
[
datastore
.
Key
]
[]
byte
),
deletes
:
make
(
map
[
datastore
.
Key
]
struct
{}),
ds
:
fs
,
},
nil
...
...
flatfs_test.go
View file @
4418505d
...
...
@@ -810,6 +810,30 @@ func testBatchDelete(dirFunc mkShardFunc, t *testing.T) {
func
TestBatchDelete
(
t
*
testing
.
T
)
{
tryAllShardFuncs
(
t
,
testBatchDelete
)
}
func
testClose
(
dirFunc
mkShardFunc
,
t
*
testing
.
T
)
{
temp
,
cleanup
:=
tempdir
(
t
)
defer
cleanup
()
fs
,
err
:=
flatfs
.
CreateOrOpen
(
temp
,
dirFunc
(
2
),
false
)
if
err
!=
nil
{
t
.
Fatalf
(
"New fail: %v
\n
"
,
err
)
}
err
=
fs
.
Put
(
datastore
.
NewKey
(
"quux"
),
[]
byte
(
"foobar"
))
if
err
!=
nil
{
t
.
Fatalf
(
"Put fail: %v
\n
"
,
err
)
}
fs
.
Close
()
err
=
fs
.
Put
(
datastore
.
NewKey
(
"qaax"
),
[]
byte
(
"foobar"
))
if
err
==
nil
{
t
.
Fatal
(
"expected put on closed datastore to fail"
)
}
}
func
TestClose
(
t
*
testing
.
T
)
{
tryAllShardFuncs
(
t
,
testClose
)
}
func
TestSHARDINGFile
(
t
*
testing
.
T
)
{
tempdir
,
cleanup
:=
tempdir
(
t
)
defer
cleanup
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment