Commit 693085c9 authored by hannahhoward's avatar hannahhoward

refactor(WantManager): extract PeerManager

Seperates the functions of tracking wants from tracking peers
Unit tests for both peer manager and want manager
Refactor internals of both to address synchonization issues discovered
in tests
parent 4f87322e
......@@ -11,8 +11,10 @@ import (
decision "github.com/ipfs/go-bitswap/decision"
bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
bspm "github.com/ipfs/go-bitswap/peermanager"
bssm "github.com/ipfs/go-bitswap/sessionmanager"
bswm "github.com/ipfs/go-bitswap/wantmanager"
......@@ -85,12 +87,19 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
" data blocks recived").Histogram(metricsBuckets)
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)
notif := notifications.New()
px := process.WithTeardown(func() error {
notif.Shutdown()
return nil
})
peerQueueFactory := func(p peer.ID) bspm.PeerQueue {
return bsmq.New(p, network)
}
bs := &Bitswap{
blockstore: bstore,
notifications: notif,
......@@ -100,14 +109,18 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: bswm.New(ctx, network),
wm: bswm.New(ctx),
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
}
go bs.wm.Run()
bs.wm.SetDelegate(bs.pm)
bs.pm.Startup()
bs.wm.Startup()
network.SetDelegate(bs)
// Start up bitswaps async worker routines
......@@ -128,6 +141,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
pm *bspm.PeerManager
// the wantlist tracks global wants for bitswap
wm *bswm.WantManager
// the engine is the bit of logic that decides who to send which blocks to
......@@ -160,8 +176,9 @@ type Bitswap struct {
counters *counters
// Metrics interface metrics
dupMetric metrics.Histogram
allMetric metrics.Histogram
dupMetric metrics.Histogram
allMetric metrics.Histogram
sentHistogram metrics.Histogram
// the sessionmanager manages tracking sessions
sm *bssm.SessionManager
......@@ -427,13 +444,14 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p)
initialWants := bs.wm.CurrentBroadcastWants()
bs.pm.Connected(p, initialWants)
bs.engine.PeerConnected(p)
}
// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p)
bs.pm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}
......
......@@ -202,10 +202,10 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
nump := len(instances) - 1
// assert we're properly connected
for _, inst := range instances {
peers := inst.Exchange.wm.ConnectedPeers()
peers := inst.Exchange.pm.ConnectedPeers()
for i := 0; i < 10 && len(peers) != nump; i++ {
time.Sleep(time.Millisecond * 50)
peers = inst.Exchange.wm.ConnectedPeers()
peers = inst.Exchange.pm.ConnectedPeers()
}
if len(peers) != nump {
t.Fatal("not enough peers connected to instance")
......
package peermanager
import (
"context"
bsmsg "github.com/ipfs/go-bitswap/message"
wantlist "github.com/ipfs/go-bitswap/wantlist"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)
var log = logging.Logger("bitswap")
var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)
type sendMessageParams struct {
entries []*bsmsg.Entry
targets []peer.ID
from uint64
}
type connectParams struct {
peer peer.ID
initialEntries []*wantlist.Entry
}
type peerMessageType int
const (
connect peerMessageType = iota + 1
disconnect
getPeers
sendMessage
)
type peerMessage struct {
messageType peerMessageType
params interface{}
resultsChan chan interface{}
}
type PeerQueue interface {
RefIncrement()
RefDecrement() bool
AddMessage(entries []*bsmsg.Entry, ses uint64)
Startup(ctx context.Context, initialEntries []*wantlist.Entry)
Shutdown()
}
type PeerQueueFactory func(p peer.ID) PeerQueue
type PeerManager struct {
// sync channel for Run loop
peerMessages chan peerMessage
// synchronized by Run loop, only touch inside there
peerQueues map[peer.ID]PeerQueue
createPeerQueue PeerQueueFactory
ctx context.Context
cancel func()
}
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
ctx, cancel := context.WithCancel(ctx)
return &PeerManager{
peerMessages: make(chan peerMessage, 10),
peerQueues: make(map[peer.ID]PeerQueue),
createPeerQueue: createPeerQueue,
ctx: ctx,
cancel: cancel,
}
}
func (pm *PeerManager) ConnectedPeers() []peer.ID {
resp := make(chan interface{})
pm.peerMessages <- peerMessage{getPeers, nil, resp}
peers := <-resp
return peers.([]peer.ID)
}
func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue {
mq, ok := pm.peerQueues[p]
if ok {
mq.RefIncrement()
return nil
}
mq = pm.createPeerQueue(p)
pm.peerQueues[p] = mq
mq.Startup(pm.ctx, initialEntries)
return mq
}
func (pm *PeerManager) stopPeerHandler(p peer.ID) {
pq, ok := pm.peerQueues[p]
if !ok {
// TODO: log error?
return
}
if pq.RefDecrement() {
return
}
pq.Shutdown()
delete(pm.peerQueues, p)
}
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
select {
case pm.peerMessages <- peerMessage{connect, connectParams{peer: p, initialEntries: initialEntries}, nil}:
case <-pm.ctx.Done():
}
}
func (pm *PeerManager) Disconnected(p peer.ID) {
select {
case pm.peerMessages <- peerMessage{disconnect, p, nil}:
case <-pm.ctx.Done():
}
}
func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
select {
case pm.peerMessages <- peerMessage{
sendMessage,
&sendMessageParams{entries: entries, targets: targets, from: from},
nil,
}:
case <-pm.ctx.Done():
}
}
func (pm *PeerManager) Startup() {
go pm.run()
}
func (pm *PeerManager) Shutdown() {
pm.cancel()
}
// TODO: use goprocess here once i trust it
func (pm *PeerManager) run() {
// NOTE: Do not open any streams or connections from anywhere in this
// event loop. Really, just don't do anything likely to block.
for {
select {
case message := <-pm.peerMessages:
pm.handleMessage(message)
case <-pm.ctx.Done():
return
}
}
}
func (pm *PeerManager) handleMessage(message peerMessage) {
switch message.messageType {
case sendMessage:
ms := message.params.(*sendMessageParams)
if len(ms.targets) == 0 {
for _, p := range pm.peerQueues {
p.AddMessage(ms.entries, ms.from)
}
} else {
for _, t := range ms.targets {
p, ok := pm.peerQueues[t]
if !ok {
log.Infof("tried sending wantlist change to non-partner peer: %s", t)
continue
}
p.AddMessage(ms.entries, ms.from)
}
}
case connect:
p := message.params.(connectParams)
pm.startPeerHandler(p.peer, p.initialEntries)
case disconnect:
disconnectPeer := message.params.(peer.ID)
pm.stopPeerHandler(disconnectPeer)
case getPeers:
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
peers = append(peers, p)
}
message.resultsChan <- peers
}
}
package peermanager
import (
"context"
"testing"
bsmsg "github.com/ipfs/go-bitswap/message"
wantlist "github.com/ipfs/go-bitswap/wantlist"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blocksutil"
"github.com/libp2p/go-libp2p-peer"
)
var blockGenerator = blocksutil.NewBlockGenerator()
func generateCids(n int) []cid.Cid {
cids := make([]cid.Cid, 0, n)
for i := 0; i < n; i++ {
c := blockGenerator.Next().Cid()
cids = append(cids, c)
}
return cids
}
var peerSeq int
func generatePeers(n int) []peer.ID {
peerIds := make([]peer.ID, 0, n)
for i := 0; i < n; i++ {
peerSeq++
p := peer.ID(peerSeq)
peerIds = append(peerIds, p)
}
return peerIds
}
var nextSession uint64
func generateSessionID() uint64 {
nextSession++
return uint64(nextSession)
}
type messageSent struct {
p peer.ID
entries []*bsmsg.Entry
ses uint64
}
type fakePeer struct {
refcnt int
p peer.ID
messagesSent chan messageSent
}
func containsPeer(peers []peer.ID, p peer.ID) bool {
for _, n := range peers {
if p == n {
return true
}
}
return false
}
func (fp *fakePeer) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {}
func (fp *fakePeer) Shutdown() {}
func (fp *fakePeer) RefIncrement() { fp.refcnt++ }
func (fp *fakePeer) RefDecrement() bool {
fp.refcnt--
return fp.refcnt > 0
}
func (fp *fakePeer) AddMessage(entries []*bsmsg.Entry, ses uint64) {
fp.messagesSent <- messageSent{fp.p, entries, ses}
}
func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory {
return func(p peer.ID) PeerQueue {
return &fakePeer{
p: p,
refcnt: 1,
messagesSent: messagesSent,
}
}
}
func TestAddingAndRemovingPeers(t *testing.T) {
ctx := context.Background()
peerQueueFactory := makePeerQueueFactory(nil)
tp := generatePeers(5)
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory)
peerManager.Startup()
peerManager.Connected(peer1, nil)
peerManager.Connected(peer2, nil)
peerManager.Connected(peer3, nil)
connectedPeers := peerManager.ConnectedPeers()
if !containsPeer(connectedPeers, peer1) ||
!containsPeer(connectedPeers, peer2) ||
!containsPeer(connectedPeers, peer3) {
t.Fatal("Peers not connected that should be connected")
}
if containsPeer(connectedPeers, peer4) ||
containsPeer(connectedPeers, peer5) {
t.Fatal("Peers connected that shouldn't be connected")
}
// removing a peer with only one reference
peerManager.Disconnected(peer1)
connectedPeers = peerManager.ConnectedPeers()
if containsPeer(connectedPeers, peer1) {
t.Fatal("Peer should have been disconnected but was not")
}
// connecting a peer twice, then disconnecting once, should stay in queue
peerManager.Connected(peer2, nil)
peerManager.Disconnected(peer2)
connectedPeers = peerManager.ConnectedPeers()
if !containsPeer(connectedPeers, peer2) {
t.Fatal("Peer was disconnected but should not have been")
}
}
......@@ -4,10 +4,7 @@ import (
"context"
"math"
engine "github.com/ipfs/go-bitswap/decision"
bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network"
wantlist "github.com/ipfs/go-bitswap/wantlist"
logging "github.com/ipfs/go-log"
......@@ -19,59 +16,72 @@ import (
var log = logging.Logger("bitswap")
const (
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32
// maxPriority is the max priority as defined by the bitswap protocol
maxPriority = math.MaxInt32
)
var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
// WantSender sends changes out to the network as they get added to the wantlist
// managed by the WantManager
type WantSender interface {
SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64)
}
type wantMessageType int
const (
isWanted wantMessageType = iota + 1
addWants
currentWants
currentBroadcastWants
wantCount
)
type wantMessage struct {
messageType wantMessageType
params interface{}
resultsChan chan interface{}
}
// WantManager manages a global want list. It tracks two seperate want lists -
// one for all wants, and one for wants that are specifically broadcast to the
// internet
type WantManager struct {
// sync channels for Run loop
incoming chan *wantSet
connectEvent chan peerStatus // notification channel for peers connecting/disconnecting
peerReqs chan chan []peer.ID // channel to request connected peers on
// channel requests to the run loop
// to get predictable behavior while running this in a go routine
// having only one channel is neccesary, so requests are processed serially
messageReqs chan wantMessage
// synchronized by Run loop, only touch inside there
peers map[peer.ID]*bsmq.MessageQueue
wl *wantlist.ThreadSafe
bcwl *wantlist.ThreadSafe
wl *wantlist.ThreadSafe
bcwl *wantlist.ThreadSafe
network bsnet.BitSwapNetwork
ctx context.Context
cancel func()
ctx context.Context
cancel func()
wantSender WantSender
wantlistGauge metrics.Gauge
sentHistogram metrics.Histogram
}
type peerStatus struct {
connect bool
peer peer.ID
}
func New(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
// New initializes a new WantManager
func New(ctx context.Context) *WantManager {
ctx, cancel := context.WithCancel(ctx)
wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
"Number of items in wantlist.").Gauge()
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)
return &WantManager{
incoming: make(chan *wantSet, 10),
connectEvent: make(chan peerStatus, 10),
peerReqs: make(chan chan []peer.ID),
peers: make(map[peer.ID]*bsmq.MessageQueue),
messageReqs: make(chan wantMessage, 10),
wl: wantlist.NewThreadSafe(),
bcwl: wantlist.NewThreadSafe(),
network: network,
ctx: ctx,
cancel: cancel,
wantlistGauge: wantlistGauge,
sentHistogram: sentHistogram,
}
}
// SetDelegate specifies who will send want changes out to the internet
func (wm *WantManager) SetDelegate(wantSender WantSender) {
wm.wantSender = wantSender
}
// WantBlocks adds the given cids to the wantlist, tracked by the given session
func (wm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) {
log.Infof("want blocks: %s", ks)
......@@ -94,158 +104,119 @@ func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []p
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: wantlist.NewRefEntry(k, kMaxPriority-i),
Entry: wantlist.NewRefEntry(k, maxPriority-i),
})
}
select {
case wm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}:
case wm.messageReqs <- wantMessage{
messageType: addWants,
params: &wantSet{entries: entries, targets: targets, from: ses},
}:
case <-wm.ctx.Done():
case <-ctx.Done():
}
}
func (wm *WantManager) ConnectedPeers() []peer.ID {
resp := make(chan []peer.ID)
wm.peerReqs <- resp
return <-resp
}
func (wm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer env.Sent()
msgSize := 0
msg := bsmsg.New(false)
for _, block := range env.Message.Blocks() {
msgSize += len(block.RawData())
msg.AddBlock(block)
log.Infof("Sending block %s to %s", block, env.Peer)
}
wm.sentHistogram.Observe(float64(msgSize))
err := wm.network.SendMessage(ctx, env.Peer, msg)
if err != nil {
log.Infof("sendblock error: %s", err)
}
}
func (wm *WantManager) startPeerHandler(p peer.ID) *bsmq.MessageQueue {
mq, ok := wm.peers[p]
if ok {
mq.RefIncrement()
return nil
}
mq = bsmq.New(p, wm.network)
wm.peers[p] = mq
mq.Startup(wm.ctx, wm.bcwl.Entries())
return mq
}
func (wm *WantManager) stopPeerHandler(p peer.ID) {
pq, ok := wm.peers[p]
if !ok {
// TODO: log error?
return
}
if pq.RefDecrement() {
return
}
pq.Shutdown()
delete(wm.peers, p)
}
func (wm *WantManager) Connected(p peer.ID) {
select {
case wm.connectEvent <- peerStatus{peer: p, connect: true}:
case <-wm.ctx.Done():
}
func (wm *WantManager) Startup() {
go wm.run()
}
func (wm *WantManager) Disconnected(p peer.ID) {
select {
case wm.connectEvent <- peerStatus{peer: p, connect: false}:
case <-wm.ctx.Done():
}
func (wm *WantManager) Shutdown() {
wm.cancel()
}
// TODO: use goprocess here once i trust it
func (wm *WantManager) Run() {
func (wm *WantManager) run() {
// NOTE: Do not open any streams or connections from anywhere in this
// event loop. Really, just don't do anything likely to block.
for {
select {
case ws := <-wm.incoming:
// is this a broadcast or not?
brdc := len(ws.targets) == 0
// add changes to our wantlist
for _, e := range ws.entries {
if e.Cancel {
if brdc {
wm.bcwl.Remove(e.Cid, ws.from)
}
if wm.wl.Remove(e.Cid, ws.from) {
wm.wantlistGauge.Dec()
}
} else {
if brdc {
wm.bcwl.AddEntry(e.Entry, ws.from)
}
if wm.wl.AddEntry(e.Entry, ws.from) {
wm.wantlistGauge.Inc()
}
case message := <-wm.messageReqs:
wm.handleMessage(message)
case <-wm.ctx.Done():
return
}
}
}
func (wm *WantManager) handleMessage(message wantMessage) {
switch message.messageType {
case addWants:
ws := message.params.(*wantSet)
// is this a broadcast or not?
brdc := len(ws.targets) == 0
// add changes to our wantlist
for _, e := range ws.entries {
if e.Cancel {
if brdc {
wm.bcwl.Remove(e.Cid, ws.from)
}
}
// broadcast those wantlist changes
if len(ws.targets) == 0 {
for _, p := range wm.peers {
p.AddMessage(ws.entries, ws.from)
if wm.wl.Remove(e.Cid, ws.from) {
wm.wantlistGauge.Dec()
}
} else {
for _, t := range ws.targets {
p, ok := wm.peers[t]
if !ok {
log.Infof("tried sending wantlist change to non-partner peer: %s", t)
continue
}
p.AddMessage(ws.entries, ws.from)
if brdc {
wm.bcwl.AddEntry(e.Entry, ws.from)
}
if wm.wl.AddEntry(e.Entry, ws.from) {
wm.wantlistGauge.Inc()
}
}
case p := <-wm.connectEvent:
if p.connect {
wm.startPeerHandler(p.peer)
} else {
wm.stopPeerHandler(p.peer)
}
case req := <-wm.peerReqs:
peers := make([]peer.ID, 0, len(wm.peers))
for p := range wm.peers {
peers = append(peers, p)
}
req <- peers
case <-wm.ctx.Done():
return
}
// broadcast those wantlist changes
wm.wantSender.SendMessage(ws.entries, ws.targets, ws.from)
case isWanted:
c := message.params.(cid.Cid)
_, isWanted := wm.wl.Contains(c)
message.resultsChan <- isWanted
case currentWants:
message.resultsChan <- wm.wl.Entries()
case currentBroadcastWants:
message.resultsChan <- wm.bcwl.Entries()
case wantCount:
message.resultsChan <- wm.wl.Len()
}
}
func (wm *WantManager) IsWanted(c cid.Cid) bool {
_, isWanted := wm.wl.Contains(c)
return isWanted
resp := make(chan interface{})
wm.messageReqs <- wantMessage{
messageType: isWanted,
params: c,
resultsChan: resp,
}
result := <-resp
return result.(bool)
}
func (wm *WantManager) CurrentWants() []*wantlist.Entry {
return wm.wl.Entries()
resp := make(chan interface{})
wm.messageReqs <- wantMessage{
messageType: currentWants,
resultsChan: resp,
}
result := <-resp
return result.([]*wantlist.Entry)
}
func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry {
resp := make(chan interface{})
wm.messageReqs <- wantMessage{
messageType: currentBroadcastWants,
resultsChan: resp,
}
result := <-resp
return result.([]*wantlist.Entry)
}
func (wm *WantManager) WantCount() int {
return wm.wl.Len()
resp := make(chan interface{})
wm.messageReqs <- wantMessage{
messageType: wantCount,
resultsChan: resp,
}
result := <-resp
return result.(int)
}
package wantmanager
import (
"context"
"reflect"
"sync"
"testing"
bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blocksutil"
"github.com/libp2p/go-libp2p-peer"
)
var blockGenerator = blocksutil.NewBlockGenerator()
func generateCids(n int) []cid.Cid {
cids := make([]cid.Cid, 0, n)
for i := 0; i < n; i++ {
c := blockGenerator.Next().Cid()
cids = append(cids, c)
}
return cids
}
var peerSeq int
func generatePeers(n int) []peer.ID {
peerIds := make([]peer.ID, 0, n)
for i := 0; i < n; i++ {
peerSeq++
p := peer.ID(peerSeq)
peerIds = append(peerIds, p)
}
return peerIds
}
var nextSession uint64
func generateSessionID() uint64 {
nextSession++
return uint64(nextSession)
}
type fakeWantSender struct {
lk sync.RWMutex
lastWantSet wantSet
}
func (fws *fakeWantSender) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
fws.lk.Lock()
fws.lastWantSet = wantSet{entries, targets, from}
fws.lk.Unlock()
}
func (fws *fakeWantSender) getLastWantSet() wantSet {
fws.lk.Lock()
defer fws.lk.Unlock()
return fws.lastWantSet
}
func setupTestFixturesAndInitialWantList() (
context.Context, *fakeWantSender, *WantManager, []cid.Cid, []cid.Cid, []peer.ID, uint64, uint64) {
ctx := context.Background()
// setup fixtures
wantSender := &fakeWantSender{}
wantManager := New(ctx)
keys := generateCids(10)
otherKeys := generateCids(5)
peers := generatePeers(10)
session := generateSessionID()
otherSession := generateSessionID()
// startup wantManager
wantManager.SetDelegate(wantSender)
wantManager.Startup()
// add initial wants
wantManager.WantBlocks(
ctx,
keys,
peers,
session)
return ctx, wantSender, wantManager, keys, otherKeys, peers, session, otherSession
}
func TestInitialWantsAddedCorrectly(t *testing.T) {
_, wantSender, wantManager, keys, _, peers, session, _ :=
setupTestFixturesAndInitialWantList()
bcwl := wantManager.CurrentBroadcastWants()
wl := wantManager.CurrentWants()
if len(bcwl) > 0 {
t.Fatal("should not create broadcast wants when peers are specified")
}
if len(wl) != len(keys) {
t.Fatal("did not add correct number of wants to want lsit")
}
generatedWantSet := wantSender.getLastWantSet()
if len(generatedWantSet.entries) != len(keys) {
t.Fatal("incorrect wants sent")
}
for _, entry := range generatedWantSet.entries {
if entry.Cancel {
t.Fatal("did not send only non-cancel messages")
}
}
if generatedWantSet.from != session {
t.Fatal("incorrect session used in sending")
}
if !reflect.DeepEqual(generatedWantSet.targets, peers) {
t.Fatal("did not setup peers correctly")
}
wantManager.Shutdown()
}
func TestCancellingWants(t *testing.T) {
ctx, wantSender, wantManager, keys, _, peers, session, _ :=
setupTestFixturesAndInitialWantList()
wantManager.CancelWants(ctx, keys, peers, session)
wl := wantManager.CurrentWants()
if len(wl) != 0 {
t.Fatal("did not remove blocks from want list")
}
generatedWantSet := wantSender.getLastWantSet()
if len(generatedWantSet.entries) != len(keys) {
t.Fatal("incorrect wants sent")
}
for _, entry := range generatedWantSet.entries {
if !entry.Cancel {
t.Fatal("did not send only cancel messages")
}
}
if generatedWantSet.from != session {
t.Fatal("incorrect session used in sending")
}
if !reflect.DeepEqual(generatedWantSet.targets, peers) {
t.Fatal("did not setup peers correctly")
}
wantManager.Shutdown()
}
func TestCancellingWantsFromAnotherSessionHasNoEffect(t *testing.T) {
ctx, _, wantManager, keys, _, peers, _, otherSession :=
setupTestFixturesAndInitialWantList()
// cancelling wants from another session has no effect
wantManager.CancelWants(ctx, keys, peers, otherSession)
wl := wantManager.CurrentWants()
if len(wl) != len(keys) {
t.Fatal("should not cancel wants unless they match session that made them")
}
wantManager.Shutdown()
}
func TestAddingWantsWithNoPeersAddsToBroadcastAndRegularWantList(t *testing.T) {
ctx, _, wantManager, keys, otherKeys, _, session, _ :=
setupTestFixturesAndInitialWantList()
wantManager.WantBlocks(ctx, otherKeys, nil, session)
bcwl := wantManager.CurrentBroadcastWants()
wl := wantManager.CurrentWants()
if len(bcwl) != len(otherKeys) {
t.Fatal("want requests with no peers should get added to broadcast list")
}
if len(wl) != len(otherKeys)+len(keys) {
t.Fatal("want requests with no peers should get added to regular want list")
}
wantManager.Shutdown()
}
func TestAddingRequestFromSecondSessionPreventsCancel(t *testing.T) {
ctx, wantSender, wantManager, keys, _, peers, session, otherSession :=
setupTestFixturesAndInitialWantList()
// add a second session requesting the first key
firstKeys := append([]cid.Cid(nil), keys[0])
wantManager.WantBlocks(ctx, firstKeys, peers, otherSession)
wl := wantManager.CurrentWants()
if len(wl) != len(keys) {
t.Fatal("wants from other sessions should not get added seperately")
}
generatedWantSet := wantSender.getLastWantSet()
if len(generatedWantSet.entries) != len(firstKeys) &&
generatedWantSet.from != otherSession &&
generatedWantSet.entries[0].Cid != firstKeys[0] &&
generatedWantSet.entries[0].Cancel != false {
t.Fatal("should send additional message requesting want for new session")
}
// cancel block from first session
wantManager.CancelWants(ctx, firstKeys, peers, session)
wl = wantManager.CurrentWants()
// want should still be on want list
if len(wl) != len(keys) {
t.Fatal("wants should not be removed until all sessions cancel wants")
}
// cancel other block from first session
secondKeys := append([]cid.Cid(nil), keys[1])
wantManager.CancelWants(ctx, secondKeys, peers, session)
wl = wantManager.CurrentWants()
// want should not be on want list, cause it was only tracked by one session
if len(wl) != len(keys)-1 {
t.Fatal("wants should be removed if all sessions have cancelled")
}
wantManager.Shutdown()
}
......@@ -6,8 +6,8 @@ import (
"sync"
"time"
engine "github.com/ipfs/go-bitswap/decision"
bsmsg "github.com/ipfs/go-bitswap/message"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
process "github.com/jbenet/goprocess"
......@@ -74,7 +74,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
}
bs.engine.MessageSent(envelope.Peer, outgoing)
bs.wm.SendBlocks(ctx, envelope)
bs.sendBlocks(ctx, envelope)
bs.counterLk.Lock()
for _, block := range envelope.Message.Blocks() {
bs.counters.blocksSent++
......@@ -90,6 +90,26 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
}
}
func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer env.Sent()
msgSize := 0
msg := bsmsg.New(false)
for _, block := range env.Message.Blocks() {
msgSize += len(block.RawData())
msg.AddBlock(block)
log.Infof("Sending block %s to %s", block, env.Peer)
}
bs.sentHistogram.Observe(float64(msgSize))
err := bs.network.SendMessage(ctx, env.Peer, msg)
if err != nil {
log.Infof("sendblock error: %s", err)
}
}
func (bs *Bitswap) provideWorker(px process.Process) {
limit := make(chan struct{}, provideWorkerMax)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment