Commit 9ed150a7 authored by hannahhoward's avatar hannahhoward

refactor(Managers): Further cleanup

Finishing adding comments to WantManager and PeerManager, refactor message structure for type
safety, add sending messages test
parent 693085c9
......@@ -16,32 +16,7 @@ var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)
type sendMessageParams struct {
entries []*bsmsg.Entry
targets []peer.ID
from uint64
}
type connectParams struct {
peer peer.ID
initialEntries []*wantlist.Entry
}
type peerMessageType int
const (
connect peerMessageType = iota + 1
disconnect
getPeers
sendMessage
)
type peerMessage struct {
messageType peerMessageType
params interface{}
resultsChan chan interface{}
}
// PeerQueue provides a queer of messages to be sent for a single peer
type PeerQueue interface {
RefIncrement()
RefDecrement() bool
......@@ -50,8 +25,14 @@ type PeerQueue interface {
Shutdown()
}
// PeerQueueFactory provides a function that will create a PeerQueue
type PeerQueueFactory func(p peer.ID) PeerQueue
type peerMessage interface {
handle(pm *PeerManager)
}
// PeerManager manages a pool of peers and sends messages to peers in the pool
type PeerManager struct {
// sync channel for Run loop
peerMessages chan peerMessage
......@@ -64,6 +45,7 @@ type PeerManager struct {
cancel func()
}
// New creates a new PeerManager, given a context and a peerQueueFactory
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
ctx, cancel := context.WithCancel(ctx)
return &PeerManager{
......@@ -75,118 +57,145 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
}
}
// ConnectedPeers returns a list of peers this PeerManager is managing
func (pm *PeerManager) ConnectedPeers() []peer.ID {
resp := make(chan interface{})
pm.peerMessages <- peerMessage{getPeers, nil, resp}
peers := <-resp
return peers.([]peer.ID)
}
func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue {
mq, ok := pm.peerQueues[p]
if ok {
mq.RefIncrement()
return nil
}
mq = pm.createPeerQueue(p)
pm.peerQueues[p] = mq
mq.Startup(pm.ctx, initialEntries)
return mq
}
func (pm *PeerManager) stopPeerHandler(p peer.ID) {
pq, ok := pm.peerQueues[p]
if !ok {
// TODO: log error?
return
}
if pq.RefDecrement() {
return
}
pq.Shutdown()
delete(pm.peerQueues, p)
resp := make(chan []peer.ID)
pm.peerMessages <- &getPeersMessage{resp}
return <-resp
}
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
select {
case pm.peerMessages <- peerMessage{connect, connectParams{peer: p, initialEntries: initialEntries}, nil}:
case pm.peerMessages <- &connectPeerMessage{p, initialEntries}:
case <-pm.ctx.Done():
}
}
// Disconnected is called to remove a peer from the pool
func (pm *PeerManager) Disconnected(p peer.ID) {
select {
case pm.peerMessages <- peerMessage{disconnect, p, nil}:
case pm.peerMessages <- &disconnectPeerMessage{p}:
case <-pm.ctx.Done():
}
}
// SendMessage is called to send a message to all or some peers in the pool
// if targets is nil, it sends to all
func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
select {
case pm.peerMessages <- peerMessage{
sendMessage,
&sendMessageParams{entries: entries, targets: targets, from: from},
nil,
}:
case pm.peerMessages <- &sendPeerMessage{entries: entries, targets: targets, from: from}:
case <-pm.ctx.Done():
}
}
// Startup enables the run loop for the PeerManager - no processing will occur
// if startup is not called
func (pm *PeerManager) Startup() {
go pm.run()
}
// Shutdown shutsdown processing for the PeerManager
func (pm *PeerManager) Shutdown() {
pm.cancel()
}
// TODO: use goprocess here once i trust it
func (pm *PeerManager) run() {
// NOTE: Do not open any streams or connections from anywhere in this
// event loop. Really, just don't do anything likely to block.
for {
select {
case message := <-pm.peerMessages:
pm.handleMessage(message)
message.handle(pm)
case <-pm.ctx.Done():
return
}
}
}
func (pm *PeerManager) handleMessage(message peerMessage) {
type sendPeerMessage struct {
entries []*bsmsg.Entry
targets []peer.ID
from uint64
}
switch message.messageType {
case sendMessage:
ms := message.params.(*sendMessageParams)
if len(ms.targets) == 0 {
for _, p := range pm.peerQueues {
p.AddMessage(ms.entries, ms.from)
}
} else {
for _, t := range ms.targets {
p, ok := pm.peerQueues[t]
if !ok {
log.Infof("tried sending wantlist change to non-partner peer: %s", t)
continue
}
p.AddMessage(ms.entries, ms.from)
}
func (s *sendPeerMessage) handle(pm *PeerManager) {
pm.sendMessage(s)
}
type connectPeerMessage struct {
p peer.ID
initialEntries []*wantlist.Entry
}
func (c *connectPeerMessage) handle(pm *PeerManager) {
pm.startPeerHandler(c.p, c.initialEntries)
}
type disconnectPeerMessage struct {
p peer.ID
}
func (dc *disconnectPeerMessage) handle(pm *PeerManager) {
pm.stopPeerHandler(dc.p)
}
type getPeersMessage struct {
peerResp chan<- []peer.ID
}
func (gp *getPeersMessage) handle(pm *PeerManager) {
pm.getPeers(gp.peerResp)
}
func (pm *PeerManager) getPeers(peerResp chan<- []peer.ID) {
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
peers = append(peers, p)
}
peerResp <- peers
}
func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue {
mq, ok := pm.peerQueues[p]
if ok {
mq.RefIncrement()
return nil
}
mq = pm.createPeerQueue(p)
pm.peerQueues[p] = mq
mq.Startup(pm.ctx, initialEntries)
return mq
}
func (pm *PeerManager) stopPeerHandler(p peer.ID) {
pq, ok := pm.peerQueues[p]
if !ok {
// TODO: log error?
return
}
if pq.RefDecrement() {
return
}
pq.Shutdown()
delete(pm.peerQueues, p)
}
func (pm *PeerManager) sendMessage(ms *sendPeerMessage) {
if len(ms.targets) == 0 {
for _, p := range pm.peerQueues {
p.AddMessage(ms.entries, ms.from)
}
case connect:
p := message.params.(connectParams)
pm.startPeerHandler(p.peer, p.initialEntries)
case disconnect:
disconnectPeer := message.params.(peer.ID)
pm.stopPeerHandler(disconnectPeer)
case getPeers:
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
peers = append(peers, p)
} else {
for _, t := range ms.targets {
p, ok := pm.peerQueues[t]
if !ok {
log.Infof("tried sending wantlist change to non-partner peer: %s", t)
continue
}
p.AddMessage(ms.entries, ms.from)
}
message.resultsChan <- peers
}
}
......@@ -2,24 +2,30 @@ package peermanager
import (
"context"
"reflect"
"testing"
"time"
bsmsg "github.com/ipfs/go-bitswap/message"
wantlist "github.com/ipfs/go-bitswap/wantlist"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blocksutil"
"github.com/libp2p/go-libp2p-peer"
)
var blockGenerator = blocksutil.NewBlockGenerator()
var prioritySeq int
func generateCids(n int) []cid.Cid {
cids := make([]cid.Cid, 0, n)
func generateEntries(n int, isCancel bool) []*bsmsg.Entry {
bsmsgs := make([]*bsmsg.Entry, 0, n)
for i := 0; i < n; i++ {
c := blockGenerator.Next().Cid()
cids = append(cids, c)
prioritySeq++
msg := &bsmsg.Entry{
Entry: wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq),
Cancel: isCancel,
}
bsmsgs = append(bsmsgs, msg)
}
return cids
return bsmsgs
}
var peerSeq int
......@@ -83,6 +89,32 @@ func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory {
}
}
func collectAndCheckMessages(
ctx context.Context,
t *testing.T,
messagesSent <-chan messageSent,
entries []*bsmsg.Entry,
ses uint64,
timeout time.Duration) []peer.ID {
var peersReceived []peer.ID
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for {
select {
case nextMessage := <-messagesSent:
if nextMessage.ses != ses {
t.Fatal("Message enqueued with wrong session")
}
if !reflect.DeepEqual(nextMessage.entries, entries) {
t.Fatal("Message enqueued with wrong wants")
}
peersReceived = append(peersReceived, nextMessage.p)
case <-timeoutCtx.Done():
return peersReceived
}
}
}
func TestAddingAndRemovingPeers(t *testing.T) {
ctx := context.Background()
peerQueueFactory := makePeerQueueFactory(nil)
......@@ -126,3 +158,65 @@ func TestAddingAndRemovingPeers(t *testing.T) {
t.Fatal("Peer was disconnected but should not have been")
}
}
func TestSendingMessagesToPeers(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan messageSent)
peerQueueFactory := makePeerQueueFactory(messagesSent)
tp := generatePeers(5)
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory)
peerManager.Startup()
peerManager.Connected(peer1, nil)
peerManager.Connected(peer2, nil)
peerManager.Connected(peer3, nil)
entries := generateEntries(5, false)
ses := generateSessionID()
peerManager.SendMessage(entries, nil, ses)
peersReceived := collectAndCheckMessages(
ctx, t, messagesSent, entries, ses, 200*time.Millisecond)
if len(peersReceived) != 3 {
t.Fatal("Incorrect number of peers received messages")
}
if !containsPeer(peersReceived, peer1) ||
!containsPeer(peersReceived, peer2) ||
!containsPeer(peersReceived, peer3) {
t.Fatal("Peers should have received message but did not")
}
if containsPeer(peersReceived, peer4) ||
containsPeer(peersReceived, peer5) {
t.Fatal("Peers received message but should not have")
}
var peersToSendTo []peer.ID
peersToSendTo = append(peersToSendTo, peer1, peer3, peer4)
peerManager.SendMessage(entries, peersToSendTo, ses)
peersReceived = collectAndCheckMessages(
ctx, t, messagesSent, entries, ses, 200*time.Millisecond)
if len(peersReceived) != 2 {
t.Fatal("Incorrect number of peers received messages")
}
if !containsPeer(peersReceived, peer1) ||
!containsPeer(peersReceived, peer3) {
t.Fatal("Peers should have received message but did not")
}
if containsPeer(peersReceived, peer2) ||
containsPeer(peersReceived, peer5) {
t.Fatal("Peers received message but should not have")
}
if containsPeer(peersReceived, peer4) {
t.Fatal("Peers targeted received message but was not connected")
}
}
......@@ -26,20 +26,8 @@ type WantSender interface {
SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64)
}
type wantMessageType int
const (
isWanted wantMessageType = iota + 1
addWants
currentWants
currentBroadcastWants
wantCount
)
type wantMessage struct {
messageType wantMessageType
params interface{}
resultsChan chan interface{}
type wantMessage interface {
handle(wm *WantManager)
}
// WantManager manages a global want list. It tracks two seperate want lists -
......@@ -49,7 +37,7 @@ type WantManager struct {
// channel requests to the run loop
// to get predictable behavior while running this in a go routine
// having only one channel is neccesary, so requests are processed serially
messageReqs chan wantMessage
wantMessages chan wantMessage
// synchronized by Run loop, only touch inside there
wl *wantlist.ThreadSafe
......@@ -62,13 +50,13 @@ type WantManager struct {
wantlistGauge metrics.Gauge
}
// New initializes a new WantManager
// New initializes a new WantManager for a given context
func New(ctx context.Context) *WantManager {
ctx, cancel := context.WithCancel(ctx)
wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
"Number of items in wantlist.").Gauge()
return &WantManager{
messageReqs: make(chan wantMessage, 10),
wantMessages: make(chan wantMessage, 10),
wl: wantlist.NewThreadSafe(),
bcwl: wantlist.NewThreadSafe(),
ctx: ctx,
......@@ -93,34 +81,40 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe
wm.addEntries(context.Background(), ks, peers, true, ses)
}
type wantSet struct {
entries []*bsmsg.Entry
targets []peer.ID
from uint64
// IsWanted returns whether a CID is currently wanted
func (wm *WantManager) IsWanted(c cid.Cid) bool {
resp := make(chan bool)
wm.wantMessages <- &isWantedMessage{c, resp}
return <-resp
}
func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) {
entries := make([]*bsmsg.Entry, 0, len(ks))
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: wantlist.NewRefEntry(k, maxPriority-i),
})
}
select {
case wm.messageReqs <- wantMessage{
messageType: addWants,
params: &wantSet{entries: entries, targets: targets, from: ses},
}:
case <-wm.ctx.Done():
case <-ctx.Done():
}
// CurrentWants returns the list of current wants
func (wm *WantManager) CurrentWants() []*wantlist.Entry {
resp := make(chan []*wantlist.Entry)
wm.wantMessages <- &currentWantsMessage{resp}
return <-resp
}
// CurrentBroadcastWants returns the current list of wants that are broadcasts
func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry {
resp := make(chan []*wantlist.Entry)
wm.wantMessages <- &currentBroadcastWantsMessage{resp}
return <-resp
}
// WantCount returns the total count of wants
func (wm *WantManager) WantCount() int {
resp := make(chan int)
wm.wantMessages <- &wantCountMessage{resp}
return <-resp
}
// Startup starts processing for the WantManager
func (wm *WantManager) Startup() {
go wm.run()
}
// Shutdown ends processing for the want manager
func (wm *WantManager) Shutdown() {
wm.cancel()
}
......@@ -130,93 +124,93 @@ func (wm *WantManager) run() {
// event loop. Really, just don't do anything likely to block.
for {
select {
case message := <-wm.messageReqs:
wm.handleMessage(message)
case message := <-wm.wantMessages:
message.handle(wm)
case <-wm.ctx.Done():
return
}
}
}
func (wm *WantManager) handleMessage(message wantMessage) {
switch message.messageType {
case addWants:
ws := message.params.(*wantSet)
// is this a broadcast or not?
brdc := len(ws.targets) == 0
// add changes to our wantlist
for _, e := range ws.entries {
if e.Cancel {
if brdc {
wm.bcwl.Remove(e.Cid, ws.from)
}
if wm.wl.Remove(e.Cid, ws.from) {
wm.wantlistGauge.Dec()
}
} else {
if brdc {
wm.bcwl.AddEntry(e.Entry, ws.from)
}
if wm.wl.AddEntry(e.Entry, ws.from) {
wm.wantlistGauge.Inc()
}
func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) {
entries := make([]*bsmsg.Entry, 0, len(ks))
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: wantlist.NewRefEntry(k, maxPriority-i),
})
}
select {
case wm.wantMessages <- &wantSet{entries: entries, targets: targets, from: ses}:
case <-wm.ctx.Done():
case <-ctx.Done():
}
}
type wantSet struct {
entries []*bsmsg.Entry
targets []peer.ID
from uint64
}
func (ws *wantSet) handle(wm *WantManager) {
// is this a broadcast or not?
brdc := len(ws.targets) == 0
// add changes to our wantlist
for _, e := range ws.entries {
if e.Cancel {
if brdc {
wm.bcwl.Remove(e.Cid, ws.from)
}
}
// broadcast those wantlist changes
wm.wantSender.SendMessage(ws.entries, ws.targets, ws.from)
case isWanted:
c := message.params.(cid.Cid)
_, isWanted := wm.wl.Contains(c)
message.resultsChan <- isWanted
case currentWants:
message.resultsChan <- wm.wl.Entries()
case currentBroadcastWants:
message.resultsChan <- wm.bcwl.Entries()
case wantCount:
message.resultsChan <- wm.wl.Len()
if wm.wl.Remove(e.Cid, ws.from) {
wm.wantlistGauge.Dec()
}
} else {
if brdc {
wm.bcwl.AddEntry(e.Entry, ws.from)
}
if wm.wl.AddEntry(e.Entry, ws.from) {
wm.wantlistGauge.Inc()
}
}
}
// broadcast those wantlist changes
wm.wantSender.SendMessage(ws.entries, ws.targets, ws.from)
}
func (wm *WantManager) IsWanted(c cid.Cid) bool {
resp := make(chan interface{})
wm.messageReqs <- wantMessage{
messageType: isWanted,
params: c,
resultsChan: resp,
}
result := <-resp
return result.(bool)
type isWantedMessage struct {
c cid.Cid
resp chan<- bool
}
func (wm *WantManager) CurrentWants() []*wantlist.Entry {
resp := make(chan interface{})
wm.messageReqs <- wantMessage{
messageType: currentWants,
resultsChan: resp,
}
result := <-resp
return result.([]*wantlist.Entry)
func (iwm *isWantedMessage) handle(wm *WantManager) {
_, isWanted := wm.wl.Contains(iwm.c)
iwm.resp <- isWanted
}
func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry {
resp := make(chan interface{})
wm.messageReqs <- wantMessage{
messageType: currentBroadcastWants,
resultsChan: resp,
}
result := <-resp
return result.([]*wantlist.Entry)
type currentWantsMessage struct {
resp chan<- []*wantlist.Entry
}
func (wm *WantManager) WantCount() int {
resp := make(chan interface{})
wm.messageReqs <- wantMessage{
messageType: wantCount,
resultsChan: resp,
}
result := <-resp
return result.(int)
func (cwm *currentWantsMessage) handle(wm *WantManager) {
cwm.resp <- wm.wl.Entries()
}
type currentBroadcastWantsMessage struct {
resp chan<- []*wantlist.Entry
}
func (cbcwm *currentBroadcastWantsMessage) handle(wm *WantManager) {
cbcwm.resp <- wm.bcwl.Entries()
}
type wantCountMessage struct {
resp chan<- int
}
func (wcm *wantCountMessage) handle(wm *WantManager) {
wcm.resp <- wm.wl.Len()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment