Commit 9b54f912 authored by hannahhoward's avatar hannahhoward

feat(peermanager): limit use of mutex

Constrain use of mutex to actual operations on the peerQueues map via utility functions
parent 703d46a6
......@@ -37,8 +37,10 @@ type peerMessage interface {
// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
peerQueues map[peer.ID]PeerQueue
lk sync.RWMutex
// peerQueues -- interact through internal utility functions get/set/remove/iterate
peerQueues map[peer.ID]PeerQueue
peerQueuesLk sync.RWMutex
createPeerQueue PeerQueueFactory
ctx context.Context
}
......@@ -54,24 +56,19 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
pm.lk.RLock()
defer pm.lk.RUnlock()
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
pm.iterate(func(p peer.ID, _ PeerQueue) {
peers = append(peers, p)
}
})
return peers
}
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
pm.lk.Lock()
defer pm.lk.Unlock()
mq, ok := pm.get(p)
mq, ok := pm.peerQueues[p]
if ok {
if mq.RefCount() == 0 {
mq.AddWantlist(initialEntries)
......@@ -81,17 +78,17 @@ func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
}
mq = pm.createPeerQueue(p)
pm.peerQueues[p] = mq
pm.set(p, mq)
mq.Startup(pm.ctx)
mq.AddWantlist(initialEntries)
}
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.lk.Lock()
defer pm.lk.Unlock()
pq, ok := pm.get(p)
pq, ok := pm.peerQueues[p]
if !ok {
// TODO: log error?
return
......@@ -102,25 +99,23 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
}
pq.Shutdown()
delete(pm.peerQueues, p)
pm.remove(p)
}
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
pm.lk.Lock()
defer pm.lk.Unlock()
if len(targets) == 0 {
for _, p := range pm.peerQueues {
pm.iterate(func(_ peer.ID, p PeerQueue) {
p.AddMessage(entries, from)
}
})
} else {
for _, t := range targets {
p, ok := pm.peerQueues[t]
p, ok := pm.get(t)
if !ok {
p = pm.createPeerQueue(t)
pm.peerQueues[t] = p
pm.set(t, p)
p.Startup(pm.ctx)
// this is a "0 reference" queue because we haven't actually connected to it
// sending the first message will cause it to connect
......@@ -130,3 +125,30 @@ func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, fr
}
}
}
func (pm *PeerManager) get(p peer.ID) (PeerQueue, bool) {
pm.peerQueuesLk.RLock()
pq, ok := pm.peerQueues[p]
pm.peerQueuesLk.RUnlock()
return pq, ok
}
func (pm *PeerManager) set(p peer.ID, pq PeerQueue) {
pm.peerQueuesLk.Lock()
pm.peerQueues[p] = pq
pm.peerQueuesLk.Unlock()
}
func (pm *PeerManager) remove(p peer.ID) {
pm.peerQueuesLk.Lock()
delete(pm.peerQueues, p)
pm.peerQueuesLk.Unlock()
}
func (pm *PeerManager) iterate(iterateFn func(peer.ID, PeerQueue)) {
pm.peerQueuesLk.RLock()
for p, pq := range pm.peerQueues {
iterateFn(p, pq)
}
pm.peerQueuesLk.RUnlock()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment