Commit 1b72b920 authored by Raúl Kripalani's avatar Raúl Kripalani

datastore-backed impls of KeyBook and PeerMetadata.

parent dbbd2fd2
...@@ -68,6 +68,13 @@ type Peerstore interface { ...@@ -68,6 +68,13 @@ type Peerstore interface {
Peers() peer.IDSlice Peers() peer.IDSlice
} }
// PeerMetadata can handle values of any type. Serializing values is
// up to the implementation. Dynamic type introspection may not be
// supported, in which case explicitly enlisting types in the
// serializer may be required.
//
// Refer to the docs of the underlying implementation for more
// information.
type PeerMetadata interface { type PeerMetadata interface {
// Get/Put is a simple registry for other peer-related key/value pairs. // Get/Put is a simple registry for other peer-related key/value pairs.
// if we find something we use often, it should become its own set of // if we find something we use often, it should become its own set of
......
...@@ -7,13 +7,15 @@ import ( ...@@ -7,13 +7,15 @@ import (
"time" "time"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query" query "github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore" pstore "github.com/libp2p/go-libp2p-peerstore"
pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem" pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem"
) )
...@@ -28,6 +30,10 @@ var ( ...@@ -28,6 +30,10 @@ var (
ErrTTLDatastore = errors.New("datastore must provide TTL support") ErrTTLDatastore = errors.New("datastore must provide TTL support")
) )
// Peer addresses are stored under the following db key pattern:
// /peers/addr/<b58 of peer id>/<hash of maddr>
var abBase = ds.NewKey("/peers/addrs")
var _ pstore.AddrBook = (*dsAddrBook)(nil) var _ pstore.AddrBook = (*dsAddrBook)(nil)
// dsAddrBook is an address book backed by a Datastore with both an // dsAddrBook is an address book backed by a Datastore with both an
...@@ -101,7 +107,7 @@ func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, er ...@@ -101,7 +107,7 @@ func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, er
var ( var (
keys = make([]ds.Key, len(addrs)) keys = make([]ds.Key, len(addrs))
clean = make([]ma.Multiaddr, len(addrs)) clean = make([]ma.Multiaddr, len(addrs))
parentKey = ds.NewKey(peer.IDB58Encode(p)) parentKey = abBase.ChildString(peer.IDB58Encode(p))
i = 0 i = 0
) )
...@@ -287,7 +293,7 @@ func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time. ...@@ -287,7 +293,7 @@ func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.
func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.Duration) error { func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.Duration) error {
var ( var (
prefix = ds.NewKey(p.Pretty()) prefix = abBase.ChildString(peer.IDB58Encode(p))
q = query.Query{Prefix: prefix.String(), KeysOnly: false} q = query.Query{Prefix: prefix.String(), KeysOnly: false}
results query.Results results query.Results
err error err error
...@@ -330,7 +336,7 @@ func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time. ...@@ -330,7 +336,7 @@ func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.
// Addrs returns all of the non-expired addresses for a given peer. // Addrs returns all of the non-expired addresses for a given peer.
func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
var ( var (
prefix = ds.NewKey(p.Pretty()) prefix = abBase.ChildString(peer.IDB58Encode(p))
q = query.Query{Prefix: prefix.String(), KeysOnly: false, ReturnExpirations: true} q = query.Query{Prefix: prefix.String(), KeysOnly: false, ReturnExpirations: true}
results query.Results results query.Results
err error err error
...@@ -385,42 +391,11 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { ...@@ -385,42 +391,11 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
// Peers returns all of the peer IDs for which the AddrBook has addresses. // Peers returns all of the peer IDs for which the AddrBook has addresses.
func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice { func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice {
var ( ids, err := uniquePeerIds(mgr.ds, abBase, func(result query.Result) string {
q = query.Query{KeysOnly: true} return ds.RawKey(result.Key).Parent().Name()
results query.Results })
err error
)
txn, err := mgr.ds.NewTransaction(true)
if err != nil { if err != nil {
log.Error(err) log.Errorf("error while retrieving peers with addresses: %v", err)
return peer.IDSlice{}
}
defer txn.Discard()
if results, err = txn.Query(q); err != nil {
log.Error(err)
return peer.IDSlice{}
}
defer results.Close()
idset := make(map[string]struct{})
for result := range results.Next() {
key := ds.RawKey(result.Key)
idset[key.Parent().Name()] = struct{}{}
}
if len(idset) == 0 {
return peer.IDSlice{}
}
ids := make(peer.IDSlice, len(idset))
i := 0
for id := range idset {
pid, _ := peer.IDB58Decode(id)
ids[i] = pid
i++
} }
return ids return ids
} }
...@@ -436,7 +411,7 @@ func (mgr *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Mult ...@@ -436,7 +411,7 @@ func (mgr *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Mult
func (mgr *dsAddrBook) ClearAddrs(p peer.ID) { func (mgr *dsAddrBook) ClearAddrs(p peer.ID) {
var ( var (
err error err error
prefix = ds.NewKey(p.Pretty()) prefix = abBase.ChildString(peer.IDB58Encode(p))
deleteFn func() error deleteFn func() error
) )
......
...@@ -143,6 +143,10 @@ func TestBadgerDsAddrBook(t *testing.T) { ...@@ -143,6 +143,10 @@ func TestBadgerDsAddrBook(t *testing.T) {
}) })
} }
func TestBadgerDsKeyBook(t *testing.T) {
pt.TestKeyBook(t, keyBookFactory(t, DefaultOpts()))
}
func BenchmarkBadgerDsPeerstore(b *testing.B) { func BenchmarkBadgerDsPeerstore(b *testing.B) {
caching := DefaultOpts() caching := DefaultOpts()
caching.CacheSize = 1024 caching.CacheSize = 1024
...@@ -159,15 +163,15 @@ func badgerStore(t testing.TB) (ds.TxnDatastore, func()) { ...@@ -159,15 +163,15 @@ func badgerStore(t testing.TB) (ds.TxnDatastore, func()) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ds, err := badger.NewDatastore(dataPath, nil) store, err := badger.NewDatastore(dataPath, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
closer := func() { closer := func() {
ds.Close() store.Close()
os.RemoveAll(dataPath) os.RemoveAll(dataPath)
} }
return ds, closer return store, closer
} }
func peerstoreFactory(tb testing.TB, opts Options) pt.PeerstoreFactory { func peerstoreFactory(tb testing.TB, opts Options) pt.PeerstoreFactory {
...@@ -195,3 +199,11 @@ func addressBookFactory(tb testing.TB, opts Options) pt.AddrBookFactory { ...@@ -195,3 +199,11 @@ func addressBookFactory(tb testing.TB, opts Options) pt.AddrBookFactory {
return ab, closeFunc return ab, closeFunc
} }
} }
func keyBookFactory(tb testing.TB, opts Options) pt.KeyBookFactory {
return func() (pstore.KeyBook, func()) {
store, closeFunc := badgerStore(tb)
kb, _ := NewKeyBook(context.Background(), store, opts)
return kb, closeFunc
}
}
package pstoreds
import (
"context"
"errors"
ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
ic "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
)
// Public and private keys are stored under the following db key pattern:
// /peers/keys/<b58 of peer id>/{pub, priv}
var (
kbBase = ds.NewKey("/peers/keys")
pubSuffix = ds.NewKey("/pub")
privSuffix = ds.NewKey("/priv")
)
type dsKeyBook struct {
ds ds.TxnDatastore
}
var _ pstore.KeyBook = (*dsKeyBook)(nil)
func NewKeyBook(_ context.Context, store ds.TxnDatastore, _ Options) (pstore.KeyBook, error) {
return &dsKeyBook{store}, nil
}
func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey {
key := kbBase.ChildString(peer.IDB58Encode(p)).Child(pubSuffix)
var pk ic.PubKey
if value, err := kb.ds.Get(key); err == nil {
pk, err = ic.UnmarshalPublicKey(value)
if err != nil {
log.Errorf("error when unmarshalling pubkey from datastore for peer %s: %s\n", p.Pretty(), err)
}
} else if err == ds.ErrNotFound {
pk, err = p.ExtractPublicKey()
if err != nil {
log.Errorf("error when extracting pubkey from peer ID for peer %s: %s\n", p.Pretty(), err)
return nil
}
pkb, err := pk.Bytes()
if err != nil {
log.Errorf("error when turning extracted pubkey into bytes for peer %s: %s\n", p.Pretty(), err)
return nil
}
err = kb.ds.Put(key, pkb)
if err != nil {
log.Errorf("error when adding extracted pubkey to peerstore for peer %s: %s\n", p.Pretty(), err)
return nil
}
} else {
log.Errorf("error when fetching pubkey from datastore for peer %s: %s\n", p.Pretty(), err)
}
return pk
}
func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error {
// check it's correct.
if !p.MatchesPublicKey(pk) {
return errors.New("peer ID does not match public key")
}
key := kbBase.ChildString(peer.IDB58Encode(p)).Child(pubSuffix)
val, err := pk.Bytes()
if err != nil {
log.Errorf("error while converting pubkey byte string for peer %s: %s\n", p.Pretty(), err)
return err
}
err = kb.ds.Put(key, val)
if err != nil {
log.Errorf("error while updating pubkey in datastore for peer %s: %s\n", p.Pretty(), err)
}
return err
}
func (kb *dsKeyBook) PrivKey(p peer.ID) ic.PrivKey {
key := kbBase.ChildString(peer.IDB58Encode(p)).Child(privSuffix)
value, err := kb.ds.Get(key)
if err != nil {
log.Errorf("error while fetching privkey from datastore for peer %s: %s\n", p.Pretty(), err)
return nil
}
sk, err := ic.UnmarshalPrivateKey(value)
if err != nil {
return nil
}
return sk
}
func (kb *dsKeyBook) AddPrivKey(p peer.ID, sk ic.PrivKey) error {
if sk == nil {
return errors.New("private key is nil")
}
// check it's correct.
if !p.MatchesPrivateKey(sk) {
return errors.New("peer ID does not match private key")
}
key := kbBase.ChildString(peer.IDB58Encode(p)).Child(privSuffix)
val, err := sk.Bytes()
if err != nil {
log.Errorf("error while converting privkey byte string for peer %s: %s\n", p.Pretty(), err)
return err
}
err = kb.ds.Put(key, val)
if err != nil {
log.Errorf("error while updating privkey in datastore for peer %s: %s\n", p.Pretty(), err)
}
return err
}
func (kb *dsKeyBook) PeersWithKeys() peer.IDSlice {
ids, err := uniquePeerIds(kb.ds, kbBase, func(result query.Result) string {
return ds.RawKey(result.Key).Parent().Name()
})
if err != nil {
log.Errorf("error while retrieving peers with keys: %v", err)
}
return ids
}
package pstoreds
import (
"bytes"
"context"
"encoding/gob"
ds "github.com/ipfs/go-datastore"
pool "github.com/libp2p/go-buffer-pool"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
)
// Metadata is stored under the following db key pattern:
// /peers/metadata/<b58 peer id>/<key>
var pmBase = ds.NewKey("/peers/metadata")
type dsPeerMetadata struct {
ds ds.Datastore
}
var _ pstore.PeerMetadata = (*dsPeerMetadata)(nil)
func init() {
// Gob registers basic types by default.
//
// Register complex types used by the peerstore itself.
gob.Register(make(map[string]struct{}))
}
// NewPeerMetadata creates a metadata store backed by a persistent db. It uses gob for serialisation.
//
// See `init()` to learn which types are registered by default. Modules wishing to store
// values of other types will need to `gob.Register()` them explicitly, or else callers
// will receive runtime errors.
func NewPeerMetadata(_ context.Context, store ds.Datastore, _ Options) (pstore.PeerMetadata, error) {
return &dsPeerMetadata{store}, nil
}
func (pm *dsPeerMetadata) Get(p peer.ID, key string) (interface{}, error) {
k := pmBase.ChildString(peer.IDB58Encode(p)).ChildString(key)
value, err := pm.ds.Get(k)
if err != nil {
if err == ds.ErrNotFound {
err = pstore.ErrNotFound
}
return nil, err
}
var res interface{}
if err := gob.NewDecoder(bytes.NewReader(value)).Decode(&res); err != nil {
return nil, err
}
return res, nil
}
func (pm *dsPeerMetadata) Put(p peer.ID, key string, val interface{}) error {
k := pmBase.ChildString(peer.IDB58Encode(p)).ChildString(key)
var buf pool.Buffer
if err := gob.NewEncoder(&buf).Encode(&val); err != nil {
return err
}
return pm.ds.Put(k, buf.Bytes())
}
...@@ -5,9 +5,10 @@ import ( ...@@ -5,9 +5,10 @@ import (
"time" "time"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore" pstore "github.com/libp2p/go-libp2p-peerstore"
pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem"
) )
// Configuration object for the peerstore. // Configuration object for the peerstore.
...@@ -42,6 +43,59 @@ func NewPeerstore(ctx context.Context, store ds.TxnDatastore, opts Options) (pst ...@@ -42,6 +43,59 @@ func NewPeerstore(ctx context.Context, store ds.TxnDatastore, opts Options) (pst
return nil, err return nil, err
} }
ps := pstore.NewPeerstore(pstoremem.NewKeyBook(), addrBook, pstoremem.NewPeerMetadata()) keyBook, err := NewKeyBook(ctx, store, opts)
if err != nil {
return nil, err
}
peerMetadata, err := NewPeerMetadata(ctx, store, opts)
if err != nil {
return nil, err
}
ps := pstore.NewPeerstore(keyBook, addrBook, peerMetadata)
return ps, nil return ps, nil
} }
// uniquePeerIds extracts and returns unique peer IDs from database keys.
func uniquePeerIds(ds ds.TxnDatastore, prefix ds.Key, extractor func(result query.Result) string) (peer.IDSlice, error) {
var (
q = query.Query{Prefix: prefix.String(), KeysOnly: true}
results query.Results
err error
)
txn, err := ds.NewTransaction(true)
if err != nil {
return peer.IDSlice{}, err
}
defer txn.Discard()
if results, err = txn.Query(q); err != nil {
log.Error(err)
return peer.IDSlice{}, err
}
defer results.Close()
idset := make(map[string]struct{})
for result := range results.Next() {
k := extractor(result)
idset[k] = struct{}{}
//key := ds.RawKey(result.Key)
//idset[key.Parent().Name()] = struct{}{}
}
if len(idset) == 0 {
return peer.IDSlice{}, nil
}
ids := make(peer.IDSlice, len(idset))
i := 0
for id := range idset {
pid, _ := peer.IDB58Decode(id)
ids[i] = pid
i++
}
return ids, nil
}
...@@ -10,11 +10,12 @@ import ( ...@@ -10,11 +10,12 @@ import (
type memoryPeerMetadata struct { type memoryPeerMetadata struct {
// store other data, like versions // store other data, like versions
//ds ds.ThreadSafeDatastore //ds ds.ThreadSafeDatastore
// TODO: use a datastore for this
ds map[string]interface{} ds map[string]interface{}
dslock sync.Mutex dslock sync.Mutex
} }
var _ pstore.PeerMetadata = (*memoryPeerMetadata)(nil)
func NewPeerMetadata() pstore.PeerMetadata { func NewPeerMetadata() pstore.PeerMetadata {
return &memoryPeerMetadata{ return &memoryPeerMetadata{
ds: make(map[string]interface{}), ds: make(map[string]interface{}),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment