Unverified Commit 80303a4b authored by vyzo's avatar vyzo Committed by GitHub

Merge pull request #78 from libp2p/feat/segmented-map

segment the memory peerstore + granular locks
parents 96639ef5 7fac6b6d
......@@ -56,17 +56,13 @@ type Peerstore interface {
KeyBook
PeerMetadata
Metrics
ProtoBook
// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
// This is a small slice of the information Peerstore has on
// that peer, useful to other services.
PeerInfo(peer.ID) PeerInfo
GetProtocols(peer.ID) ([]string, error)
AddProtocols(peer.ID, ...string) error
SetProtocols(peer.ID, ...string) error
SupportsProtocols(peer.ID, ...string) ([]string, error)
// Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice
}
......@@ -142,3 +138,11 @@ type KeyBook interface {
// PeersWithKeys returns all the peer IDs stored in the KeyBook
PeersWithKeys() peer.IDSlice
}
// ProtoBook tracks the protocols supported by peers
type ProtoBook interface {
GetProtocols(peer.ID) ([]string, error)
AddProtocols(peer.ID, ...string) error
SetProtocols(peer.ID, ...string) error
SupportsProtocols(peer.ID, ...string) ([]string, error)
}
......@@ -3,37 +3,30 @@ package peerstore
import (
"fmt"
"io"
"sync"
peer "github.com/libp2p/go-libp2p-peer"
)
var _ Peerstore = (*peerstore)(nil)
const maxInternedProtocols = 512
const maxInternedProtocolSize = 256
type peerstore struct {
Metrics
KeyBook
AddrBook
ProtoBook
PeerMetadata
// lock for protocol information, separate from datastore lock
protolock sync.RWMutex
internedProtocols map[string]string
}
// NewPeerstore creates a data structure that stores peer data, backed by the
// supplied implementations of KeyBook, AddrBook and PeerMetadata.
func NewPeerstore(kb KeyBook, ab AddrBook, md PeerMetadata) Peerstore {
func NewPeerstore(kb KeyBook, ab AddrBook, pb ProtoBook, md PeerMetadata) Peerstore {
return &peerstore{
KeyBook: kb,
AddrBook: ab,
PeerMetadata: md,
Metrics: NewMetrics(),
internedProtocols: make(map[string]string),
KeyBook: kb,
AddrBook: ab,
ProtoBook: pb,
PeerMetadata: md,
Metrics: NewMetrics(),
}
}
......@@ -49,6 +42,7 @@ func (ps *peerstore) Close() (err error) {
weakClose("keybook", ps.KeyBook)
weakClose("addressbook", ps.AddrBook)
weakClose("protobook", ps.ProtoBook)
weakClose("peermetadata", ps.PeerMetadata)
if len(errs) > 0 {
......@@ -80,101 +74,6 @@ func (ps *peerstore) PeerInfo(p peer.ID) PeerInfo {
}
}
func (ps *peerstore) internProtocol(s string) string {
if len(s) > maxInternedProtocolSize {
return s
}
if interned, ok := ps.internedProtocols[s]; ok {
return interned
}
if len(ps.internedProtocols) >= maxInternedProtocols {
ps.internedProtocols = make(map[string]string, maxInternedProtocols)
}
ps.internedProtocols[s] = s
return s
}
func (ps *peerstore) SetProtocols(p peer.ID, protos ...string) error {
ps.protolock.Lock()
defer ps.protolock.Unlock()
protomap := make(map[string]struct{}, len(protos))
for _, proto := range protos {
protomap[ps.internProtocol(proto)] = struct{}{}
}
return ps.Put(p, "protocols", protomap)
}
func (ps *peerstore) AddProtocols(p peer.ID, protos ...string) error {
ps.protolock.Lock()
defer ps.protolock.Unlock()
protomap, err := ps.getProtocolMap(p)
if err != nil {
return err
}
for _, proto := range protos {
protomap[ps.internProtocol(proto)] = struct{}{}
}
return ps.Put(p, "protocols", protomap)
}
func (ps *peerstore) getProtocolMap(p peer.ID) (map[string]struct{}, error) {
iprotomap, err := ps.Get(p, "protocols")
switch err {
default:
return nil, err
case ErrNotFound:
return make(map[string]struct{}), nil
case nil:
cast, ok := iprotomap.(map[string]struct{})
if !ok {
return nil, fmt.Errorf("stored protocol set was not a map")
}
return cast, nil
}
}
func (ps *peerstore) GetProtocols(p peer.ID) ([]string, error) {
ps.protolock.RLock()
defer ps.protolock.RUnlock()
pmap, err := ps.getProtocolMap(p)
if err != nil {
return nil, err
}
out := make([]string, 0, len(pmap))
for k := range pmap {
out = append(out, k)
}
return out, nil
}
func (ps *peerstore) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) {
ps.protolock.RLock()
defer ps.protolock.RUnlock()
pmap, err := ps.getProtocolMap(p)
if err != nil {
return nil, err
}
out := make([]string, 0, len(protos))
for _, proto := range protos {
if _, ok := pmap[proto]; ok {
out = append(out, proto)
}
}
return out, nil
}
func PeerInfos(ps Peerstore, peers peer.IDSlice) []PeerInfo {
pi := make([]PeerInfo, len(peers))
for i, p := range peers {
......
......@@ -63,7 +63,9 @@ func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (pstore.
return nil, err
}
ps := pstore.NewPeerstore(keyBook, addrBook, peerMetadata)
protoBook := NewProtoBook(peerMetadata)
ps := pstore.NewPeerstore(keyBook, addrBook, protoBook, peerMetadata)
return ps, nil
}
......
package pstoreds
import (
"fmt"
"sync"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
)
type protoSegment struct {
sync.RWMutex
}
type protoSegments [256]*protoSegment
func (s *protoSegments) get(p peer.ID) *protoSegment {
return s[byte(p[len(p)-1])]
}
type dsProtoBook struct {
segments protoSegments
meta pstore.PeerMetadata
}
var _ pstore.ProtoBook = (*dsProtoBook)(nil)
func NewProtoBook(meta pstore.PeerMetadata) pstore.ProtoBook {
return &dsProtoBook{
meta: meta,
segments: func() (ret protoSegments) {
for i := range ret {
ret[i] = &protoSegment{}
}
return ret
}(),
}
}
func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error {
pb.segments.get(p).Lock()
defer pb.segments.get(p).Unlock()
protomap := make(map[string]struct{}, len(protos))
for _, proto := range protos {
protomap[proto] = struct{}{}
}
return pb.meta.Put(p, "protocols", protomap)
}
func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...string) error {
pb.segments.get(p).Lock()
defer pb.segments.get(p).Unlock()
pmap, err := pb.getProtocolMap(p)
if err != nil {
return err
}
for _, proto := range protos {
pmap[proto] = struct{}{}
}
return pb.meta.Put(p, "protocols", pmap)
}
func (pb *dsProtoBook) GetProtocols(p peer.ID) ([]string, error) {
pb.segments.get(p).RLock()
defer pb.segments.get(p).RUnlock()
pmap, err := pb.getProtocolMap(p)
if err != nil {
return nil, err
}
res := make([]string, 0, len(pmap))
for proto := range pmap {
res = append(res, proto)
}
return res, nil
}
func (pb *dsProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) {
pb.segments.get(p).RLock()
defer pb.segments.get(p).RUnlock()
pmap, err := pb.getProtocolMap(p)
if err != nil {
return nil, err
}
res := make([]string, 0, len(protos))
for _, proto := range protos {
if _, ok := pmap[proto]; ok {
res = append(res, proto)
}
}
return res, nil
}
func (pb *dsProtoBook) getProtocolMap(p peer.ID) (map[string]struct{}, error) {
iprotomap, err := pb.meta.Get(p, "protocols")
switch err {
default:
return nil, err
case pstore.ErrNotFound:
return make(map[string]struct{}), nil
case nil:
cast, ok := iprotomap.(map[string]struct{})
if !ok {
return nil, fmt.Errorf("stored protocol set was not a map")
}
return cast, nil
}
}
......@@ -26,15 +26,24 @@ func (e *expiringAddr) ExpiredBy(t time.Time) bool {
return t.After(e.Expires)
}
var _ pstore.AddrBook = (*memoryAddrBook)(nil)
type addrSegments [256]*addrSegment
type addrSegment struct {
sync.RWMutex
// memoryAddrBook manages addresses.
type memoryAddrBook struct {
addrmu sync.RWMutex
// Use pointers to save memory. Maps always leave some fraction of their
// space unused. storing the *values* directly in the map will
// drastically increase the space waste. In our case, by 6x.
addrs map[peer.ID]map[string]*expiringAddr
}
func (s *addrSegments) get(p peer.ID) *addrSegment {
return s[byte(p[len(p)-1])]
}
// memoryAddrBook manages addresses.
type memoryAddrBook struct {
segments addrSegments
ctx context.Context
cancel func()
......@@ -42,11 +51,18 @@ type memoryAddrBook struct {
subManager *AddrSubManager
}
var _ pstore.AddrBook = (*memoryAddrBook)(nil)
func NewAddrBook() pstore.AddrBook {
ctx, cancel := context.WithCancel(context.Background())
ab := &memoryAddrBook{
addrs: make(map[peer.ID]map[string]*expiringAddr),
segments: func() (ret addrSegments) {
for i, _ := range ret {
ret[i] = &addrSegment{addrs: make(map[peer.ID]map[string]*expiringAddr)}
}
return ret
}(),
subManager: NewAddrSubManager(),
ctx: ctx,
cancel: cancel,
......@@ -79,29 +95,32 @@ func (mab *memoryAddrBook) Close() error {
// gc garbage collects the in-memory address book.
func (mab *memoryAddrBook) gc() {
mab.addrmu.Lock()
defer mab.addrmu.Unlock()
now := time.Now()
for p, amap := range mab.addrs {
for k, addr := range amap {
if addr.ExpiredBy(now) {
delete(amap, k)
for _, s := range mab.segments {
s.Lock()
for p, amap := range s.addrs {
for k, addr := range amap {
if addr.ExpiredBy(now) {
delete(amap, k)
}
}
if len(amap) == 0 {
delete(s.addrs, p)
}
}
if len(amap) == 0 {
delete(mab.addrs, p)
}
s.Unlock()
}
}
func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice {
mab.addrmu.RLock()
defer mab.addrmu.RUnlock()
pids := make(peer.IDSlice, 0, len(mab.addrs))
for pid := range mab.addrs {
pids = append(pids, pid)
var pids peer.IDSlice
for _, s := range mab.segments {
s.RLock()
for pid, _ := range s.addrs {
pids = append(pids, pid)
}
s.RUnlock()
}
return pids
}
......@@ -115,18 +134,19 @@ func (mab *memoryAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati
// (time-to-live), after which the address is no longer valid.
// If the manager has a longer TTL, the operation is a no-op for that address
func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
mab.addrmu.Lock()
defer mab.addrmu.Unlock()
// if ttl is zero, exit. nothing to do.
if ttl <= 0 {
return
}
amap := mab.addrs[p]
s := mab.segments.get(p)
s.Lock()
defer s.Unlock()
amap := s.addrs[p]
if amap == nil {
amap = make(map[string]*expiringAddr, len(addrs))
mab.addrs[p] = amap
s.addrs[p] = amap
}
exp := time.Now().Add(ttl)
for _, addr := range addrs {
......@@ -152,13 +172,14 @@ func (mab *memoryAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
// This is used when we receive the best estimate of the validity of an address.
func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
mab.addrmu.Lock()
defer mab.addrmu.Unlock()
s := mab.segments.get(p)
s.Lock()
defer s.Unlock()
amap := mab.addrs[p]
amap := s.addrs[p]
if amap == nil {
amap = make(map[string]*expiringAddr, len(addrs))
mab.addrs[p] = amap
s.addrs[p] = amap
}
exp := time.Now().Add(ttl)
......@@ -172,7 +193,6 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
if ttl > 0 {
amap[addrstr] = &expiringAddr{Addr: addr, Expires: exp, TTL: ttl}
mab.subManager.BroadcastAddr(p, addr)
} else {
delete(amap, addrstr)
......@@ -183,10 +203,11 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
// UpdateAddrs updates the addresses associated with the given peer that have
// the given oldTTL to have the given newTTL.
func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
mab.addrmu.Lock()
defer mab.addrmu.Unlock()
s := mab.segments.get(p)
s.Lock()
defer s.Unlock()
amap, found := mab.addrs[p]
amap, found := s.addrs[p]
if !found {
return
}
......@@ -203,10 +224,11 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t
// Addresses returns all known (and valid) addresses for a given
func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
mab.addrmu.RLock()
defer mab.addrmu.RUnlock()
s := mab.segments.get(p)
s.RLock()
defer s.RUnlock()
amap, found := mab.addrs[p]
amap, found := s.addrs[p]
if !found {
return nil
}
......@@ -224,19 +246,21 @@ func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
// ClearAddrs removes all previously stored addresses
func (mab *memoryAddrBook) ClearAddrs(p peer.ID) {
mab.addrmu.Lock()
defer mab.addrmu.Unlock()
s := mab.segments.get(p)
s.Lock()
defer s.Unlock()
delete(mab.addrs, p)
delete(s.addrs, p)
}
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
mab.addrmu.RLock()
defer mab.addrmu.RUnlock()
s := mab.segments.get(p)
s.RLock()
defer s.RUnlock()
baseaddrslice := mab.addrs[p]
baseaddrslice := s.addrs[p]
initial := make([]ma.Multiaddr, 0, len(baseaddrslice))
for _, a := range baseaddrslice {
initial = append(initial, a.Addr)
......
......@@ -7,5 +7,6 @@ func NewPeerstore() pstore.Peerstore {
return pstore.NewPeerstore(
NewKeyBook(),
NewAddrBook(),
NewProtoBook(),
NewPeerMetadata())
}
package pstoremem
import (
"sync"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
)
const (
maxInternedProtocols = 512
maxInternedProtocolSize = 256
)
type protoSegment struct {
sync.RWMutex
protocols map[peer.ID]map[string]struct{}
}
type protoSegments [256]*protoSegment
func (s *protoSegments) get(p peer.ID) *protoSegment {
return s[byte(p[len(p)-1])]
}
type memoryProtoBook struct {
segments protoSegments
lk sync.RWMutex
interned map[string]string
}
var _ pstore.ProtoBook = (*memoryProtoBook)(nil)
func NewProtoBook() pstore.ProtoBook {
return &memoryProtoBook{
interned: make(map[string]string, maxInternedProtocols),
segments: func() (ret protoSegments) {
for i := range ret {
ret[i] = &protoSegment{
protocols: make(map[peer.ID]map[string]struct{}),
}
}
return ret
}(),
}
}
func (pb *memoryProtoBook) internProtocol(proto string) string {
if len(proto) > maxInternedProtocolSize {
return proto
}
// check if it is interned with the read lock
pb.lk.RLock()
interned, ok := pb.interned[proto]
pb.lk.RUnlock()
if ok {
return interned
}
// intern with the write lock
pb.lk.Lock()
defer pb.lk.Unlock()
// check again in case it got interned in between locks
interned, ok = pb.interned[proto]
if ok {
return interned
}
// if we've filled the table, throw it away and start over
if len(pb.interned) >= maxInternedProtocols {
pb.interned = make(map[string]string, maxInternedProtocols)
}
pb.interned[proto] = proto
return proto
}
func (pb *memoryProtoBook) SetProtocols(p peer.ID, protos ...string) error {
s := pb.segments.get(p)
s.Lock()
defer s.Unlock()
newprotos := make(map[string]struct{}, len(protos))
for _, proto := range protos {
newprotos[pb.internProtocol(proto)] = struct{}{}
}
s.protocols[p] = newprotos
return nil
}
func (pb *memoryProtoBook) AddProtocols(p peer.ID, protos ...string) error {
s := pb.segments.get(p)
s.Lock()
defer s.Unlock()
protomap, ok := s.protocols[p]
if !ok {
protomap = make(map[string]struct{})
s.protocols[p] = protomap
}
for _, proto := range protos {
protomap[pb.internProtocol(proto)] = struct{}{}
}
return nil
}
func (pb *memoryProtoBook) GetProtocols(p peer.ID) ([]string, error) {
s := pb.segments.get(p)
s.RLock()
defer s.RUnlock()
out := make([]string, 0, len(s.protocols))
for k := range s.protocols[p] {
out = append(out, k)
}
return out, nil
}
func (pb *memoryProtoBook) SupportsProtocols(p peer.ID, protos ...string) ([]string, error) {
s := pb.segments.get(p)
s.RLock()
defer s.RUnlock()
out := make([]string, 0, len(protos))
for _, proto := range protos {
if _, ok := s.protocols[p][proto]; ok {
out = append(out, proto)
}
}
return out, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment