Commit c8bc7ce3 authored by Jeromy's avatar Jeromy

Write providers to disk to avoid memory leaks

License: MIT
Signed-off-by: default avatarJeromy <why@ipfs.io>
parent 22af8d11
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
routing "github.com/ipfs/go-ipfs/routing" routing "github.com/ipfs/go-ipfs/routing"
pb "github.com/ipfs/go-ipfs/routing/dht/pb" pb "github.com/ipfs/go-ipfs/routing/dht/pb"
providers "github.com/ipfs/go-ipfs/routing/dht/providers"
kb "github.com/ipfs/go-ipfs/routing/kbucket" kb "github.com/ipfs/go-ipfs/routing/kbucket"
record "github.com/ipfs/go-ipfs/routing/record" record "github.com/ipfs/go-ipfs/routing/record"
...@@ -48,7 +49,7 @@ type IpfsDHT struct { ...@@ -48,7 +49,7 @@ type IpfsDHT struct {
datastore ds.Datastore // Local data datastore ds.Datastore // Local data
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
providers *ProviderManager providers *providers.ProviderManager
birth time.Time // When this peer started up birth time.Time // When this peer started up
diaglock sync.Mutex // lock to make diagnostics work better diaglock sync.Mutex // lock to make diagnostics work better
...@@ -84,8 +85,8 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Datastore) *IpfsDHT { ...@@ -84,8 +85,8 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Datastore) *IpfsDHT {
dht.ctx = ctx dht.ctx = ctx
h.SetStreamHandler(ProtocolDHT, dht.handleNewStream) h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
dht.providers = NewProviderManager(dht.ctx, dht.self) dht.providers = providers.NewProviderManager(dht.ctx, dht.self, dstore)
dht.proc.AddChild(dht.providers.proc) dht.proc.AddChild(dht.providers.Process())
goprocessctx.CloseAfterContext(dht.proc, ctx) goprocessctx.CloseAfterContext(dht.proc, ctx)
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore) dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
......
package dht package providers
import ( import (
"encoding/base32"
"encoding/binary"
"fmt"
"strings"
"time" "time"
key "github.com/ipfs/go-ipfs/blocks/key" logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context" goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore"
dsq "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore/query"
key "github.com/ipfs/go-ipfs/blocks/key"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
) )
var log = logging.Logger("providers")
var lruCacheSize = 256
var ProvideValidity = time.Hour * 24 var ProvideValidity = time.Hour * 24
var defaultCleanupInterval = time.Hour var defaultCleanupInterval = time.Hour
type ProviderManager struct { type ProviderManager struct {
// all non channel fields are meant to be accessed only within // all non channel fields are meant to be accessed only within
// the run method // the run method
providers map[key.Key]*providerSet providers *lru.Cache
local map[key.Key]struct{} local map[key.Key]struct{}
lpeer peer.ID lpeer peer.ID
dstore ds.Datastore
getlocal chan chan []key.Key getlocal chan chan []key.Key
newprovs chan *addProv newprovs chan *addProv
...@@ -45,11 +58,17 @@ type getProv struct { ...@@ -45,11 +58,17 @@ type getProv struct {
resp chan []peer.ID resp chan []peer.ID
} }
func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager { func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Datastore) *ProviderManager {
pm := new(ProviderManager) pm := new(ProviderManager)
pm.getprovs = make(chan *getProv) pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv) pm.newprovs = make(chan *addProv)
pm.providers = make(map[key.Key]*providerSet) pm.dstore = dstore
cache, err := lru.New(lruCacheSize)
if err != nil {
panic(err) //only happens if negative value is passed to lru constructor
}
pm.providers = cache
pm.getlocal = make(chan chan []key.Key) pm.getlocal = make(chan chan []key.Key)
pm.local = make(map[key.Key]struct{}) pm.local = make(map[key.Key]struct{})
pm.proc = goprocessctx.WithContext(ctx) pm.proc = goprocessctx.WithContext(ctx)
...@@ -59,6 +78,174 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager { ...@@ -59,6 +78,174 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
return pm return pm
} }
const providersKeyPrefix = "/providers/"
func mkProvKey(k key.Key) ds.Key {
return ds.NewKey(providersKeyPrefix + base32.StdEncoding.EncodeToString([]byte(k)))
}
func (pm *ProviderManager) Process() goprocess.Process {
return pm.proc
}
func (pm *ProviderManager) providersForKey(k key.Key) ([]peer.ID, error) {
pset, err := pm.getProvSet(k)
if err != nil {
return nil, err
}
return pset.providers, nil
}
func (pm *ProviderManager) getProvSet(k key.Key) (*providerSet, error) {
cached, ok := pm.providers.Get(k)
if ok {
return cached.(*providerSet), nil
}
pset, err := loadProvSet(pm.dstore, k)
if err != nil {
return nil, err
}
if len(pset.providers) > 0 {
pm.providers.Add(k, pset)
}
return pset, nil
}
func loadProvSet(dstore ds.Datastore, k key.Key) (*providerSet, error) {
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k).String()})
if err != nil {
return nil, err
}
out := newProviderSet()
for e := range res.Next() {
if e.Error != nil {
log.Error("got an error: ", err)
continue
}
parts := strings.Split(e.Key, "/")
if len(parts) != 4 {
log.Warning("incorrectly formatted key: ", e.Key)
continue
}
decstr, err := base32.StdEncoding.DecodeString(parts[len(parts)-1])
if err != nil {
log.Error("base32 decoding error: ", err)
continue
}
pid := peer.ID(decstr)
t, err := readTimeValue(e.Value)
if err != nil {
log.Warning("parsing providers record from disk: ", err)
continue
}
out.setVal(pid, t)
}
return out, nil
}
func readTimeValue(i interface{}) (time.Time, error) {
data, ok := i.([]byte)
if !ok {
return time.Time{}, fmt.Errorf("data was not a []byte")
}
nsec, _ := binary.Varint(data)
return time.Unix(0, nsec), nil
}
func (pm *ProviderManager) addProv(k key.Key, p peer.ID) error {
iprovs, ok := pm.providers.Get(k)
if !ok {
iprovs = newProviderSet()
pm.providers.Add(k, iprovs)
}
provs := iprovs.(*providerSet)
now := time.Now()
provs.setVal(p, now)
return writeProviderEntry(pm.dstore, k, p, now)
}
func writeProviderEntry(dstore ds.Datastore, k key.Key, p peer.ID, t time.Time) error {
dsk := mkProvKey(k).ChildString(base32.StdEncoding.EncodeToString([]byte(p)))
buf := make([]byte, 16)
n := binary.PutVarint(buf, t.UnixNano())
return dstore.Put(dsk, buf[:n])
}
func (pm *ProviderManager) deleteProvSet(k key.Key) error {
pm.providers.Remove(k)
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Prefix: mkProvKey(k).String(),
})
entries, err := res.Rest()
if err != nil {
return err
}
for _, e := range entries {
err := pm.dstore.Delete(ds.NewKey(e.Key))
if err != nil {
log.Error("deleting provider set: ", err)
}
}
return nil
}
func (pm *ProviderManager) getAllProvKeys() ([]key.Key, error) {
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Prefix: providersKeyPrefix,
})
if err != nil {
return nil, err
}
entries, err := res.Rest()
if err != nil {
return nil, err
}
out := make([]key.Key, 0, len(entries))
seen := make(map[key.Key]struct{})
for _, e := range entries {
parts := strings.Split(e.Key, "/")
if len(parts) != 4 {
log.Warning("incorrectly formatted provider entry in datastore")
continue
}
decoded, err := base32.StdEncoding.DecodeString(parts[2])
if err != nil {
log.Warning("error decoding base32 provider key")
continue
}
k := key.Key(decoded)
if _, ok := seen[k]; !ok {
out = append(out, key.Key(decoded))
seen[k] = struct{}{}
}
}
return out, nil
}
func (pm *ProviderManager) run() { func (pm *ProviderManager) run() {
tick := time.NewTicker(pm.cleanupInterval) tick := time.NewTicker(pm.cleanupInterval)
for { for {
...@@ -67,22 +254,17 @@ func (pm *ProviderManager) run() { ...@@ -67,22 +254,17 @@ func (pm *ProviderManager) run() {
if np.val == pm.lpeer { if np.val == pm.lpeer {
pm.local[np.k] = struct{}{} pm.local[np.k] = struct{}{}
} }
provs, ok := pm.providers[np.k] err := pm.addProv(np.k, np.val)
if !ok { if err != nil {
provs = newProviderSet() log.Error("error adding new providers: ", err)
pm.providers[np.k] = provs
} }
provs.Add(np.val)
case gp := <-pm.getprovs: case gp := <-pm.getprovs:
var parr []peer.ID provs, err := pm.providersForKey(gp.k)
provs, ok := pm.providers[gp.k] if err != nil && err != ds.ErrNotFound {
if ok { log.Error("error reading providers: ", err)
parr = provs.providers
} }
gp.resp <- parr gp.resp <- provs
case lc := <-pm.getlocal: case lc := <-pm.getlocal:
var keys []key.Key var keys []key.Key
for k := range pm.local { for k := range pm.local {
...@@ -91,7 +273,17 @@ func (pm *ProviderManager) run() { ...@@ -91,7 +273,17 @@ func (pm *ProviderManager) run() {
lc <- keys lc <- keys
case <-tick.C: case <-tick.C:
for k, provs := range pm.providers { keys, err := pm.getAllProvKeys()
if err != nil {
log.Error("Error loading provider keys: ", err)
continue
}
for _, k := range keys {
provs, err := pm.getProvSet(k)
if err != nil {
log.Error("error loading known provset: ", err)
continue
}
var filtered []peer.ID var filtered []peer.ID
for p, t := range provs.set { for p, t := range provs.set {
if time.Now().Sub(t) > ProvideValidity { if time.Now().Sub(t) > ProvideValidity {
...@@ -104,10 +296,12 @@ func (pm *ProviderManager) run() { ...@@ -104,10 +296,12 @@ func (pm *ProviderManager) run() {
if len(filtered) > 0 { if len(filtered) > 0 {
provs.providers = filtered provs.providers = filtered
} else { } else {
delete(pm.providers, k) err := pm.deleteProvSet(k)
if err != nil {
log.Error("error deleting provider set: ", err)
}
} }
} }
case <-pm.proc.Closing(): case <-pm.proc.Closing():
return return
} }
...@@ -156,10 +350,14 @@ func newProviderSet() *providerSet { ...@@ -156,10 +350,14 @@ func newProviderSet() *providerSet {
} }
func (ps *providerSet) Add(p peer.ID) { func (ps *providerSet) Add(p peer.ID) {
ps.setVal(p, time.Now())
}
func (ps *providerSet) setVal(p peer.ID, t time.Time) {
_, found := ps.set[p] _, found := ps.set[p]
if !found { if !found {
ps.providers = append(ps.providers, p) ps.providers = append(ps.providers, p)
} }
ps.set[p] = time.Now() ps.set[p] = t
} }
package dht package providers
import ( import (
"fmt"
"testing" "testing"
"time" "time"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
) )
...@@ -13,7 +15,7 @@ import ( ...@@ -13,7 +15,7 @@ import (
func TestProviderManager(t *testing.T) { func TestProviderManager(t *testing.T) {
ctx := context.Background() ctx := context.Background()
mid := peer.ID("testing") mid := peer.ID("testing")
p := NewProviderManager(ctx, mid) p := NewProviderManager(ctx, mid, ds.NewMapDatastore())
a := key.Key("test") a := key.Key("test")
p.AddProvider(ctx, a, peer.ID("testingprovider")) p.AddProvider(ctx, a, peer.ID("testingprovider"))
resp := p.GetProviders(ctx, a) resp := p.GetProviders(ctx, a)
...@@ -23,13 +25,75 @@ func TestProviderManager(t *testing.T) { ...@@ -23,13 +25,75 @@ func TestProviderManager(t *testing.T) {
p.proc.Close() p.proc.Close()
} }
func TestProvidersDatastore(t *testing.T) {
old := lruCacheSize
lruCacheSize = 10
defer func() { lruCacheSize = old }()
ctx := context.Background()
mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, ds.NewMapDatastore())
defer p.proc.Close()
friend := peer.ID("friend")
var keys []key.Key
for i := 0; i < 100; i++ {
k := key.Key(fmt.Sprint(i))
keys = append(keys, k)
p.AddProvider(ctx, k, friend)
}
for _, k := range keys {
resp := p.GetProviders(ctx, k)
if len(resp) != 1 {
t.Fatal("Could not retrieve provider.")
}
if resp[0] != friend {
t.Fatal("expected provider to be 'friend'")
}
}
}
func TestProvidersSerialization(t *testing.T) {
dstore := ds.NewMapDatastore()
k := key.Key("my key!")
p := peer.ID("my peer")
pt := time.Now()
err := writeProviderEntry(dstore, k, p, pt)
if err != nil {
t.Fatal(err)
}
pset, err := loadProvSet(dstore, k)
if err != nil {
t.Fatal(err)
}
lt, ok := pset.set[p]
if !ok {
t.Fatal("failed to load set correctly")
}
if pt != lt {
t.Fatal("time wasnt serialized correctly")
}
}
func TestProvidesExpire(t *testing.T) { func TestProvidesExpire(t *testing.T) {
ProvideValidity = time.Second pval := ProvideValidity
defaultCleanupInterval = time.Second cleanup := defaultCleanupInterval
ProvideValidity = time.Second / 2
defaultCleanupInterval = time.Second / 2
defer func() {
ProvideValidity = pval
defaultCleanupInterval = cleanup
}()
ctx := context.Background() ctx := context.Background()
mid := peer.ID("testing") mid := peer.ID("testing")
p := NewProviderManager(ctx, mid) p := NewProviderManager(ctx, mid, ds.NewMapDatastore())
peers := []peer.ID{"a", "b"} peers := []peer.ID{"a", "b"}
var keys []key.Key var keys []key.Key
...@@ -47,7 +111,7 @@ func TestProvidesExpire(t *testing.T) { ...@@ -47,7 +111,7 @@ func TestProvidesExpire(t *testing.T) {
} }
} }
time.Sleep(time.Second * 3) time.Sleep(time.Second)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
out := p.GetProviders(ctx, keys[i]) out := p.GetProviders(ctx, keys[i])
if len(out) > 2 { if len(out) > 2 {
...@@ -55,7 +119,16 @@ func TestProvidesExpire(t *testing.T) { ...@@ -55,7 +119,16 @@ func TestProvidesExpire(t *testing.T) {
} }
} }
if len(p.providers) != 0 { if p.providers.Len() != 0 {
t.Fatal("providers map not cleaned up") t.Fatal("providers map not cleaned up")
} }
allprovs, err := p.getAllProvKeys()
if err != nil {
t.Fatal(err)
}
if len(allprovs) != 0 {
t.Fatal("expected everything to be cleaned out of the datastore")
}
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment