Commit 32d0c188 authored by hannahhoward's avatar hannahhoward

fix(wantlist): remove races on setup

fix race conditions while setting up wantlists by creating peer queues on demand

BREAKING CHANGE: PeerManager SendMessage signature changed

fix #51
parent 722239f1
......@@ -132,7 +132,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}
bs.wm.SetDelegate(bs.pm)
bs.pm.Startup()
bs.wm.Startup()
bs.pqm.Startup()
network.SetDelegate(bs)
......@@ -361,14 +360,13 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
// Connected/Disconnected warns bitswap about peer connections.
func (bs *Bitswap) PeerConnected(p peer.ID) {
initialWants := bs.wm.CurrentBroadcastWants()
bs.pm.Connected(p, initialWants)
bs.wm.Connected(p)
bs.engine.PeerConnected(p)
}
// Connected/Disconnected warns bitswap about peer connections.
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.pm.Disconnected(p)
bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}
......
......@@ -2,6 +2,7 @@ package peermanager
import (
"context"
"sync"
bsmsg "github.com/ipfs/go-bitswap/message"
wantlist "github.com/ipfs/go-bitswap/wantlist"
......@@ -34,150 +35,56 @@ type peerMessage interface {
// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
// sync channel for Run loop
peerMessages chan peerMessage
// synchronized by Run loop, only touch inside there
peerQueues map[peer.ID]PeerQueue
peerQueues map[peer.ID]PeerQueue
lk sync.RWMutex
createPeerQueue PeerQueueFactory
ctx context.Context
cancel func()
}
// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
ctx, cancel := context.WithCancel(ctx)
return &PeerManager{
peerMessages: make(chan peerMessage, 10),
peerQueues: make(map[peer.ID]PeerQueue),
createPeerQueue: createPeerQueue,
ctx: ctx,
cancel: cancel,
}
}
// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
resp := make(chan []peer.ID, 1)
select {
case pm.peerMessages <- &getPeersMessage{resp}:
case <-pm.ctx.Done():
return nil
}
select {
case peers := <-resp:
return peers
case <-pm.ctx.Done():
return nil
}
}
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
select {
case pm.peerMessages <- &connectPeerMessage{p, initialEntries}:
case <-pm.ctx.Done():
}
}
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
select {
case pm.peerMessages <- &disconnectPeerMessage{p}:
case <-pm.ctx.Done():
}
}
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
select {
case pm.peerMessages <- &sendPeerMessage{entries: entries, targets: targets, from: from}:
case <-pm.ctx.Done():
}
}
// Startup enables the run loop for the PeerManager - no processing will occur
// if startup is not called.
func (pm *PeerManager) Startup() {
go pm.run()
}
// Shutdown shutsdown processing for the PeerManager.
func (pm *PeerManager) Shutdown() {
pm.cancel()
}
func (pm *PeerManager) run() {
for {
select {
case message := <-pm.peerMessages:
message.handle(pm)
case <-pm.ctx.Done():
return
}
}
}
pm.lk.RLock()
defer pm.lk.RUnlock()
type sendPeerMessage struct {
entries []*bsmsg.Entry
targets []peer.ID
from uint64
}
func (s *sendPeerMessage) handle(pm *PeerManager) {
pm.sendMessage(s)
}
type connectPeerMessage struct {
p peer.ID
initialEntries []*wantlist.Entry
}
func (c *connectPeerMessage) handle(pm *PeerManager) {
pm.startPeerHandler(c.p, c.initialEntries)
}
type disconnectPeerMessage struct {
p peer.ID
}
func (dc *disconnectPeerMessage) handle(pm *PeerManager) {
pm.stopPeerHandler(dc.p)
}
type getPeersMessage struct {
peerResp chan<- []peer.ID
}
func (gp *getPeersMessage) handle(pm *PeerManager) {
pm.getPeers(gp.peerResp)
}
func (pm *PeerManager) getPeers(peerResp chan<- []peer.ID) {
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
peers = append(peers, p)
}
peerResp <- peers
return peers
}
func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue {
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
pm.lk.Lock()
defer pm.lk.Unlock()
mq, ok := pm.peerQueues[p]
if ok {
mq.RefIncrement()
return nil
return
}
mq = pm.createPeerQueue(p)
pm.peerQueues[p] = mq
mq.Startup(pm.ctx, initialEntries)
return mq
}
func (pm *PeerManager) stopPeerHandler(p peer.ID) {
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.lk.Lock()
defer pm.lk.Unlock()
pq, ok := pm.peerQueues[p]
if !ok {
// TODO: log error?
......@@ -192,19 +99,28 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) {
delete(pm.peerQueues, p)
}
func (pm *PeerManager) sendMessage(ms *sendPeerMessage) {
if len(ms.targets) == 0 {
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
func (pm *PeerManager) SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
pm.lk.Lock()
defer pm.lk.Unlock()
if len(targets) == 0 {
for _, p := range pm.peerQueues {
p.AddMessage(ms.entries, ms.from)
p.AddMessage(entries, from)
}
} else {
for _, t := range ms.targets {
for _, t := range targets {
p, ok := pm.peerQueues[t]
if !ok {
log.Infof("tried sending wantlist change to non-partner peer: %s", t)
continue
p = pm.createPeerQueue(t)
pm.peerQueues[t] = p
p.Startup(pm.ctx, initialEntries)
// this is a "0 reference" queue because we haven't actually connected to it
// sending the first message will cause it to connect
p.RefDecrement()
}
p.AddMessage(ms.entries, ms.from)
p.AddMessage(entries, from)
}
}
}
......@@ -79,7 +79,6 @@ func TestAddingAndRemovingPeers(t *testing.T) {
tp := testutil.GeneratePeers(5)
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory)
peerManager.Startup()
peerManager.Connected(peer1, nil)
peerManager.Connected(peer2, nil)
......@@ -118,14 +117,13 @@ func TestAddingAndRemovingPeers(t *testing.T) {
func TestSendingMessagesToPeers(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan messageSent)
messagesSent := make(chan messageSent, 16)
peerQueueFactory := makePeerQueueFactory(messagesSent)
tp := testutil.GeneratePeers(5)
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory)
peerManager.Startup()
peerManager.Connected(peer1, nil)
peerManager.Connected(peer2, nil)
......@@ -134,7 +132,7 @@ func TestSendingMessagesToPeers(t *testing.T) {
entries := testutil.GenerateMessageEntries(5, false)
ses := testutil.GenerateSessionID()
peerManager.SendMessage(entries, nil, ses)
peerManager.SendMessage(nil, entries, nil, ses)
peersReceived := collectAndCheckMessages(
ctx, t, messagesSent, entries, ses, 10*time.Millisecond)
......@@ -155,11 +153,11 @@ func TestSendingMessagesToPeers(t *testing.T) {
var peersToSendTo []peer.ID
peersToSendTo = append(peersToSendTo, peer1, peer3, peer4)
peerManager.SendMessage(entries, peersToSendTo, ses)
peerManager.SendMessage(nil, entries, peersToSendTo, ses)
peersReceived = collectAndCheckMessages(
ctx, t, messagesSent, entries, ses, 10*time.Millisecond)
if len(peersReceived) != 2 {
if len(peersReceived) != 3 {
t.Fatal("Incorrect number of peers received messages")
}
......@@ -173,7 +171,7 @@ func TestSendingMessagesToPeers(t *testing.T) {
t.Fatal("Peers received message but should not have")
}
if testutil.ContainsPeer(peersReceived, peer4) {
t.Fatal("Peers targeted received message but was not connected")
if !testutil.ContainsPeer(peersReceived, peer4) {
t.Fatal("Peer should have autoconnected on message send")
}
}
......@@ -20,10 +20,12 @@ const (
maxPriority = math.MaxInt32
)
// WantSender sends changes out to the network as they get added to the wantlist
// PeerHandler sends changes out to the network as they get added to the wantlist
// managed by the WantManager.
type WantSender interface {
SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64)
type PeerHandler interface {
Disconnected(p peer.ID)
Connected(p peer.ID, initialEntries []*wantlist.Entry)
SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64)
}
type wantMessage interface {
......@@ -46,7 +48,7 @@ type WantManager struct {
ctx context.Context
cancel func()
wantSender WantSender
peerHandler PeerHandler
wantlistGauge metrics.Gauge
}
......@@ -66,8 +68,8 @@ func New(ctx context.Context) *WantManager {
}
// SetDelegate specifies who will send want changes out to the internet.
func (wm *WantManager) SetDelegate(wantSender WantSender) {
wm.wantSender = wantSender
func (wm *WantManager) SetDelegate(peerHandler PeerHandler) {
wm.peerHandler = peerHandler
}
// WantBlocks adds the given cids to the wantlist, tracked by the given session.
......@@ -145,6 +147,22 @@ func (wm *WantManager) WantCount() int {
}
}
// Connected is called when a new peer is connected
func (wm *WantManager) Connected(p peer.ID) {
select {
case wm.wantMessages <- &connectedMessage{p}:
case <-wm.ctx.Done():
}
}
// Disconnected is called when a peer is disconnected
func (wm *WantManager) Disconnected(p peer.ID) {
select {
case wm.wantMessages <- &disconnectedMessage{p}:
case <-wm.ctx.Done():
}
}
// Startup starts processing for the WantManager.
func (wm *WantManager) Startup() {
go wm.run()
......@@ -214,7 +232,7 @@ func (ws *wantSet) handle(wm *WantManager) {
}
// broadcast those wantlist changes
wm.wantSender.SendMessage(ws.entries, ws.targets, ws.from)
wm.peerHandler.SendMessage(wm.bcwl.Entries(), ws.entries, ws.targets, ws.from)
}
type isWantedMessage struct {
......@@ -250,3 +268,19 @@ type wantCountMessage struct {
func (wcm *wantCountMessage) handle(wm *WantManager) {
wcm.resp <- wm.wl.Len()
}
type connectedMessage struct {
p peer.ID
}
func (cm *connectedMessage) handle(wm *WantManager) {
wm.peerHandler.Connected(cm.p, wm.bcwl.Entries())
}
type disconnectedMessage struct {
p peer.ID
}
func (dm *disconnectedMessage) handle(wm *WantManager) {
wm.peerHandler.Disconnected(dm.p)
}
......@@ -7,35 +7,41 @@ import (
"testing"
"github.com/ipfs/go-bitswap/testutil"
wantlist "github.com/ipfs/go-bitswap/wantlist"
bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-peer"
)
type fakeWantSender struct {
lk sync.RWMutex
lastWantSet wantSet
type fakePeerHandler struct {
lk sync.RWMutex
lastWantSet wantSet
initialEntries []*wantlist.Entry
}
func (fws *fakeWantSender) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
fws.lk.Lock()
fws.lastWantSet = wantSet{entries, targets, from}
fws.lk.Unlock()
func (fph *fakePeerHandler) SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
fph.lk.Lock()
fph.lastWantSet = wantSet{entries, targets, from}
fph.initialEntries = initialEntries
fph.lk.Unlock()
}
func (fws *fakeWantSender) getLastWantSet() wantSet {
fws.lk.Lock()
defer fws.lk.Unlock()
return fws.lastWantSet
func (fph *fakePeerHandler) Connected(p peer.ID, initialEntries []*wantlist.Entry) {}
func (fph *fakePeerHandler) Disconnected(p peer.ID) {}
func (fph *fakePeerHandler) getLastWantSet() wantSet {
fph.lk.Lock()
defer fph.lk.Unlock()
return fph.lastWantSet
}
func setupTestFixturesAndInitialWantList() (
context.Context, *fakeWantSender, *WantManager, []cid.Cid, []cid.Cid, []peer.ID, uint64, uint64) {
context.Context, *fakePeerHandler, *WantManager, []cid.Cid, []cid.Cid, []peer.ID, uint64, uint64) {
ctx := context.Background()
// setup fixtures
wantSender := &fakeWantSender{}
wantSender := &fakePeerHandler{}
wantManager := New(ctx)
keys := testutil.GenerateCids(10)
otherKeys := testutil.GenerateCids(5)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment