Commit 4951001b authored by hannahhoward's avatar hannahhoward

feat(sessions): optimize peers

Order optimized peers by most recent to receive a block
parent ce22eba3
......@@ -3,18 +3,28 @@ package sessionpeermanager
import (
"context"
"fmt"
"math/rand"
cid "github.com/ipfs/go-cid"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
peer "github.com/libp2p/go-libp2p-peer"
)
const (
maxOptimizedPeers = 25
reservePeers = 2
)
// PeerNetwork is an interface for finding providers and managing connections
type PeerNetwork interface {
ConnectionManager() ifconnmgr.ConnManager
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}
type peerMessage interface {
handle(spm *SessionPeerManager)
}
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
type SessionPeerManager struct {
......@@ -22,12 +32,12 @@ type SessionPeerManager struct {
network PeerNetwork
tag string
newPeers chan peer.ID
peerReqs chan chan []peer.ID
peerMessages chan peerMessage
// do not touch outside of run loop
activePeers map[peer.ID]struct{}
activePeersArr []peer.ID
activePeers map[peer.ID]bool
unoptimizedPeersArr []peer.ID
optimizedPeersArr []peer.ID
}
// New creates a new SessionPeerManager
......@@ -35,9 +45,8 @@ func New(ctx context.Context, id uint64, network PeerNetwork) *SessionPeerManage
spm := &SessionPeerManager{
ctx: ctx,
network: network,
newPeers: make(chan peer.ID, 16),
peerReqs: make(chan chan []peer.ID),
activePeers: make(map[peer.ID]struct{}),
peerMessages: make(chan peerMessage, 16),
activePeers: make(map[peer.ID]bool),
}
spm.tag = fmt.Sprint("bs-ses-", id)
......@@ -53,7 +62,7 @@ func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
// at the moment, we're just adding peers here
// in the future, we'll actually use this to record metrics
select {
case spm.newPeers <- p:
case spm.peerMessages <- &peerResponseMessage{p}:
case <-spm.ctx.Done():
}
}
......@@ -70,7 +79,7 @@ func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// ordered by optimization, or only a subset
resp := make(chan []peer.ID)
select {
case spm.peerReqs <- resp:
case spm.peerMessages <- &peerReqMessage{resp}:
case <-spm.ctx.Done():
return nil
}
......@@ -93,7 +102,7 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
spm.newPeers <- p
spm.peerMessages <- &peerFoundMessage{p}
}
}(c)
}
......@@ -101,29 +110,100 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
func (spm *SessionPeerManager) run(ctx context.Context) {
for {
select {
case p := <-spm.newPeers:
spm.addActivePeer(p)
case resp := <-spm.peerReqs:
resp <- spm.activePeersArr
case pm := <-spm.peerMessages:
pm.handle(spm)
case <-ctx.Done():
spm.handleShutdown()
return
}
}
}
func (spm *SessionPeerManager) addActivePeer(p peer.ID) {
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = struct{}{}
spm.activePeersArr = append(spm.activePeersArr, p)
func (spm *SessionPeerManager) tagPeer(p peer.ID) {
cmgr := spm.network.ConnectionManager()
cmgr.TagPeer(p, spm.tag, 10)
}
func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
}
spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}
type peerFoundMessage struct {
p peer.ID
}
func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
p := pfm.p
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = false
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
spm.tagPeer(p)
}
}
type peerResponseMessage struct {
p peer.ID
}
func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
p := prm.p
isOptimized, ok := spm.activePeers[p]
if !ok {
spm.activePeers[p] = true
spm.tagPeer(p)
} else {
if isOptimized {
if spm.optimizedPeersArr[0] == p {
return
}
for i := 0; i < len(spm.optimizedPeersArr); i++ {
if spm.optimizedPeersArr[i] == p {
spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
break
}
}
} else {
spm.activePeers[p] = true
for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
if spm.unoptimizedPeersArr[i] == p {
spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
break
}
}
}
}
spm.insertOptimizedPeer(p)
}
type peerReqMessage struct {
resp chan<- []peer.ID
}
func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
if maxPeers > maxOptimizedPeers {
maxPeers = maxOptimizedPeers
}
extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
for i := range extraPeers {
extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
}
prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
}
func (spm *SessionPeerManager) handleShutdown() {
cmgr := spm.network.ConnectionManager()
for _, p := range spm.activePeersArr {
for p := range spm.activePeers {
cmgr.UntagPeer(p, spm.tag)
}
}
......@@ -3,6 +3,7 @@ package sessionpeermanager
import (
"context"
"sync"
"math/rand"
"testing"
"time"
......@@ -120,6 +121,69 @@ func TestRecordingReceivedBlocks(t *testing.T) {
}
}
func TestOrderingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
peers := testutil.GeneratePeers(100)
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
c := testutil.GenerateCids(1)
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpn)
// add all peers to session
sessionPeerManager.FindMorePeers(ctx, c[0])
// record broadcast
sessionPeerManager.RecordPeerRequests(nil, c)
// record receives
peer1 := peers[rand.Intn(100)]
peer2 := peers[rand.Intn(100)]
peer3 := peers[rand.Intn(100)]
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, c[0])
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, c[0])
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer3, c[0])
sessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(sessionPeers) != maxOptimizedPeers {
t.Fatal("Should not return more than the max of optimized peers")
}
// should prioritize peers which have received blocks
if (sessionPeers[0] != peer3) || (sessionPeers[1] != peer2) || (sessionPeers[2] != peer1) {
t.Fatal("Did not prioritize peers that received blocks")
}
// Receive a second time from same node
sessionPeerManager.RecordPeerResponse(peer3, c[0])
// call again
nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(nextSessionPeers) != maxOptimizedPeers {
t.Fatal("Should not return more than the max of optimized peers")
}
// should not duplicate
if (nextSessionPeers[0] != peer3) || (nextSessionPeers[1] != peer2) || (nextSessionPeers[2] != peer1) {
t.Fatal("Did dedup peers which received multiple blocks")
}
// should randomize other peers
totalSame := 0
for i := 3; i < maxOptimizedPeers; i++ {
if sessionPeers[i] == nextSessionPeers[i] {
totalSame++
}
}
if totalSame >= maxOptimizedPeers-3 {
t.Fatal("should not return the same random peers each time")
}
}
func TestUntaggingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment