Commit 92414049 authored by hannahhoward's avatar hannahhoward

feat(peermanager): create peer manager

Create a peer manager to manage message queues for peers
parent 239b5d8a
package peermanager
import (
"context"
"sync"
"time"
gsmsg "github.com/ipfs/go-graphsync/message"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)
const (
defaultCleanupInterval = time.Minute
)
var log = logging.Logger("graphsync")
var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)
// PeerQueue provides a queer of messages to be sent for a single peer.
type PeerQueue interface {
AddRequest(id gsmsg.GraphSyncRequestID,
selector []byte,
priority gsmsg.GraphSyncPriority)
Cancel(id gsmsg.GraphSyncRequestID)
Startup()
Shutdown()
}
// PeerQueueFactory provides a function that will create a PeerQueue.
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue
type peerQueueInstance struct {
refcnt int
pq PeerQueue
}
// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
peerQueues map[peer.ID]*peerQueueInstance
peerQueuesLk sync.RWMutex
createPeerQueue PeerQueueFactory
ctx context.Context
}
// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
return &PeerManager{
peerQueues: make(map[peer.ID]*peerQueueInstance),
createPeerQueue: createPeerQueue,
ctx: ctx,
}
}
// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
pm.peerQueuesLk.RLock()
defer pm.peerQueuesLk.RUnlock()
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
peers = append(peers, p)
}
return peers
}
// Connected is called to add a new peer to the pool
func (pm *PeerManager) Connected(p peer.ID) {
pm.peerQueuesLk.Lock()
pq := pm.getOrCreate(p)
pq.refcnt++
pm.peerQueuesLk.Unlock()
}
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.peerQueuesLk.Lock()
pq, ok := pm.peerQueues[p]
if !ok {
pm.peerQueuesLk.Unlock()
return
}
pq.refcnt--
if pq.refcnt > 0 {
pm.peerQueuesLk.Unlock()
return
}
delete(pm.peerQueues, p)
pm.peerQueuesLk.Unlock()
pq.pq.Shutdown()
}
// SendRequest sends the given request to the given peer.
func (pm *PeerManager) SendRequest(
p peer.ID,
id gsmsg.GraphSyncRequestID,
selector []byte,
priority gsmsg.GraphSyncPriority) {
pm.peerQueuesLk.Lock()
pqi := pm.getOrCreate(p)
pm.peerQueuesLk.Unlock()
pqi.pq.AddRequest(id, selector, priority)
}
// CancelRequest cancels the given request id on the given peer.
func (pm *PeerManager) CancelRequest(
p peer.ID,
id gsmsg.GraphSyncRequestID) {
pm.peerQueuesLk.Lock()
pqi := pm.getOrCreate(p)
pm.peerQueuesLk.Unlock()
pqi.pq.Cancel(id)
}
func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance {
pqi, ok := pm.peerQueues[p]
if !ok {
pq := pm.createPeerQueue(pm.ctx, p)
pq.Startup()
pqi = &peerQueueInstance{0, pq}
pm.peerQueues[p] = pqi
}
return pqi
}
package peermanager
import (
"context"
"math/rand"
"reflect"
"testing"
"time"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testutil"
"github.com/libp2p/go-libp2p-peer"
)
type messageSent struct {
p peer.ID
message gsmsg.GraphSyncMessage
}
type fakePeer struct {
p peer.ID
messagesSent chan messageSent
}
func (fp *fakePeer) Startup() {}
func (fp *fakePeer) Shutdown() {}
func (fp *fakePeer) AddRequest(id gsmsg.GraphSyncRequestID,
selector []byte,
priority gsmsg.GraphSyncPriority) {
message := gsmsg.New()
message.AddRequest(id, selector, priority)
fp.messagesSent <- messageSent{fp.p, message}
}
func (fp *fakePeer) Cancel(id gsmsg.GraphSyncRequestID) {
message := gsmsg.New()
message.Cancel(id)
fp.messagesSent <- messageSent{fp.p, message}
}
func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory {
return func(ctx context.Context, p peer.ID) PeerQueue {
return &fakePeer{
p: p,
messagesSent: messagesSent,
}
}
}
func TestAddingAndRemovingPeers(t *testing.T) {
ctx := context.Background()
peerQueueFactory := makePeerQueueFactory(nil)
tp := testutil.GeneratePeers(5)
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory)
peerManager.Connected(peer1)
peerManager.Connected(peer2)
peerManager.Connected(peer3)
connectedPeers := peerManager.ConnectedPeers()
if !testutil.ContainsPeer(connectedPeers, peer1) ||
!testutil.ContainsPeer(connectedPeers, peer2) ||
!testutil.ContainsPeer(connectedPeers, peer3) {
t.Fatal("Peers not connected that should be connected")
}
if testutil.ContainsPeer(connectedPeers, peer4) ||
testutil.ContainsPeer(connectedPeers, peer5) {
t.Fatal("Peers connected that shouldn't be connected")
}
// removing a peer with only one reference
peerManager.Disconnected(peer1)
connectedPeers = peerManager.ConnectedPeers()
if testutil.ContainsPeer(connectedPeers, peer1) {
t.Fatal("Peer should have been disconnected but was not")
}
// connecting a peer twice, then disconnecting once, should stay in queue
peerManager.Connected(peer2)
peerManager.Disconnected(peer2)
connectedPeers = peerManager.ConnectedPeers()
if !testutil.ContainsPeer(connectedPeers, peer2) {
t.Fatal("Peer was disconnected but should not have been")
}
}
func TestSendingMessagesToPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
messagesSent := make(chan messageSent, 5)
peerQueueFactory := makePeerQueueFactory(messagesSent)
tp := testutil.GeneratePeers(5)
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
selector := testutil.RandomBytes(100)
peerManager := New(ctx, peerQueueFactory)
peerManager.SendRequest(tp[0], id, selector, priority)
peerManager.SendRequest(tp[1], id, selector, priority)
peerManager.CancelRequest(tp[0], id)
select {
case <-ctx.Done():
t.Fatal("did not send first message")
case firstMessage := <-messagesSent:
if firstMessage.p != tp[0] {
t.Fatal("First message sent to wrong peer")
}
request := firstMessage.message.Requests()[0]
if request.ID() != id ||
request.IsCancel() != false ||
request.Priority() != priority ||
!reflect.DeepEqual(request.Selector(), selector) {
t.Fatal("did not send correct first message")
}
}
select {
case <-ctx.Done():
t.Fatal("did not send second message")
case secondMessage := <-messagesSent:
if secondMessage.p != tp[1] {
t.Fatal("Second message sent to wrong peer")
}
request := secondMessage.message.Requests()[0]
if request.ID() != id ||
request.IsCancel() != false ||
request.Priority() != priority ||
!reflect.DeepEqual(request.Selector(), selector) {
t.Fatal("did not send correct second message")
}
}
select {
case <-ctx.Done():
t.Fatal("did not send third message")
case thirdMessage := <-messagesSent:
if thirdMessage.p != tp[0] {
t.Fatal("Third message sent to wrong peer")
}
request := thirdMessage.message.Requests()[0]
if request.ID() != id ||
request.IsCancel() != true {
t.Fatal("third message was not a cancel")
}
}
connectedPeers := peerManager.ConnectedPeers()
if len(connectedPeers) != 2 ||
!testutil.ContainsPeer(connectedPeers, tp[0]) ||
!testutil.ContainsPeer(connectedPeers, tp[1]) {
t.Fatal("did not connect all peers that were sent messages")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment