Commit e7e91867 authored by Steven Allen's avatar Steven Allen

providers: run datastore GC concurrently

Motivation: Walking the datastore can take time and currently blocks
adding/removing providers.

We need to do this in the same goroutine to avoid some logical races.
parent 5b6bc7e9
......@@ -182,8 +182,12 @@ func (pm *ProviderManager) addProv(k cid.Cid, p peer.ID) error {
return writeProviderEntry(pm.dstore, k, p, now)
}
func mkProvKeyFor(k cid.Cid, p peer.ID) string {
return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
}
func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) error {
dsk := mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
dsk := mkProvKeyFor(k, p)
buf := make([]byte, 16)
n := binary.PutVarint(buf, t.UnixNano())
......@@ -191,49 +195,25 @@ func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time)
return dstore.Put(ds.NewKey(dsk), buf[:n])
}
func (pm *ProviderManager) gc() {
res, err := pm.dstore.Query(dsq.Query{
Prefix: providersKeyPrefix,
})
if err != nil {
log.Error("error garbage collecting provider records: ", err)
return
}
defer res.Close()
now := time.Now()
for {
e, ok := res.NextSync()
if !ok {
return
}
if e.Error != nil {
log.Error("got an error: ", e.Error)
continue
func (pm *ProviderManager) run(proc goprocess.Process) {
var (
gcQuery dsq.Results
gcQueryRes <-chan dsq.Result
gcSkip map[string]struct{}
gcTime time.Time
gcTimer = time.NewTimer(pm.cleanupInterval)
)
defer func() {
gcTimer.Stop()
if gcQuery != nil {
// don't really care if this fails.
_ = gcQuery.Close()
}
// check expiration time
t, err := readTimeValue(e.Value)
switch {
case err != nil:
// couldn't parse the time
log.Warning("parsing providers record from disk: ", err)
fallthrough
case now.Sub(t) > ProvideValidity:
// or just expired
err = pm.dstore.Delete(ds.RawKey(e.Key))
if err != nil {
log.Warning("failed to remove provider record from disk: ", err)
}
if err := pm.dstore.Flush(); err != nil {
log.Error("failed to flush datastore: ", err)
}
}
}
func (pm *ProviderManager) run(proc goprocess.Process) {
tick := time.NewTicker(pm.cleanupInterval)
defer tick.Stop()
defer pm.dstore.Flush()
}()
for {
select {
......@@ -241,6 +221,12 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
err := pm.addProv(np.k, np.val)
if err != nil {
log.Error("error adding new providers: ", err)
continue
}
if gcSkip != nil {
// we have an gc, tell it to skip this provider
// as we've updated it since the GC started.
gcSkip[mkProvKeyFor(np.k, np.val)] = struct{}{}
}
case gp := <-pm.getprovs:
provs, err := pm.providersForKey(gp.k)
......@@ -250,13 +236,62 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
// set the cap so the user can't append to this.
gp.resp <- provs[0:len(provs):len(provs)]
case <-tick.C:
case res, ok := <-gcQueryRes:
if !ok {
if err := gcQuery.Close(); err != nil {
log.Error("failed to close provider GC query: ", err)
}
gcTimer.Reset(pm.cleanupInterval)
// cleanup GC round
gcQueryRes = nil
gcSkip = nil
gcQuery = nil
continue
}
if res.Error != nil {
log.Error("got error from GC query: ", res.Error)
continue
}
if _, ok := gcSkip[res.Key]; ok {
// We've updated this record since starting the
// GC round, skip it.
continue
}
// check expiration time
t, err := readTimeValue(res.Value)
switch {
case err != nil:
// couldn't parse the time
log.Warning("parsing providers record from disk: ", err)
fallthrough
case gcTime.Sub(t) > ProvideValidity:
// or expired
err = pm.dstore.Delete(ds.RawKey(res.Key))
if err != nil && err != ds.ErrNotFound {
log.Warning("failed to remove provider record from disk: ", err)
}
}
case gcTime = <-gcTimer.C:
// You know the wonderful thing about caches? You can
// drop them.
//
// Much faster than GCing.
pm.providers.Purge()
pm.gc()
// Now, kick off a GC of the datastore.
q, err := pm.dstore.Query(dsq.Query{
Prefix: providersKeyPrefix,
})
if err != nil {
log.Error("provider record GC query failed: ", err)
continue
}
gcQuery = q
gcQueryRes = q.Next()
gcSkip = make(map[string]struct{})
case <-proc.Closing():
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment