Commit e82011a8 authored by Brian Tiger Chow's avatar Brian Tiger Chow

fix(bitswap.decision.Engine) enqueue only the freshest messages

Before, the engine worker would pop a task and block on send to the
bitswap worker even if the bitswap worker wasn't to receive. Since the
task could have been invalidated during this blocking send, a small
number of stale (already acquired) blocks would be send to partners.

Now, tasks are only popped off of the queue when bitswap is ready to
send them over the wire. This is accomplished by removing the
outboxChanBuffer and implementing a two-phase communication sequence.
parent a70a16c9
......@@ -277,10 +277,13 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
case <-ctx.Done():
log.Debugf("exiting")
return
case envelope := <-bs.engine.Outbox():
log.Debugf("message to %s sending...", envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
log.Debugf("message to %s sent", envelope.Peer)
case nextEnvelope := <-bs.engine.Outbox():
select {
case <-ctx.Done():
return
case envelope := <-nextEnvelope:
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
}
}
......
......@@ -44,7 +44,8 @@ import (
var log = eventlog.Logger("engine")
const (
sizeOutboxChan = 4
// outboxChanBuffer must be 0 to prevent stale messages from being sent
outboxChanBuffer = 0
)
// Envelope contains a message for a Peer
......@@ -68,8 +69,9 @@ type Engine struct {
// that case, no lock would be required.
workSignal chan struct{}
// outbox contains outgoing messages to peers
outbox chan Envelope
// outbox contains outgoing messages to peers. This is owned by the
// taskWorker goroutine
outbox chan (<-chan Envelope)
bs bstore.Blockstore
......@@ -83,7 +85,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
ledgerMap: make(map[peer.ID]*ledger),
bs: bs,
peerRequestQueue: newPRQ(),
outbox: make(chan Envelope, sizeOutboxChan),
outbox: make(chan (<-chan Envelope), outboxChanBuffer),
workSignal: make(chan struct{}),
}
go e.taskWorker(ctx)
......@@ -91,45 +93,55 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
}
func (e *Engine) taskWorker(ctx context.Context) {
log := log.Prefix("bitswap.Engine.taskWorker")
defer close(e.outbox) // because taskWorker uses the channel exclusively
for {
oneTimeUse := make(chan Envelope, 1) // buffer to prevent blocking
select {
case <-ctx.Done():
return
case e.outbox <- oneTimeUse:
}
// receiver is ready for an outoing envelope. let's prepare one. first,
// we must acquire a task from the PQ...
envelope, err := e.nextEnvelope(ctx)
if err != nil {
close(oneTimeUse)
return // ctx cancelled
}
oneTimeUse <- *envelope // buffered. won't block
close(oneTimeUse)
}
}
// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for {
nextTask := e.peerRequestQueue.Pop()
if nextTask == nil {
// No tasks in the list?
// Wait until there are!
for nextTask == nil {
select {
case <-ctx.Done():
log.Debugf("exiting: %s", ctx.Err())
return
return nil, ctx.Err()
case <-e.workSignal:
log.Debugf("woken up")
nextTask = e.peerRequestQueue.Pop()
}
continue
}
log := log.Prefix("%s", nextTask)
log.Debugf("processing")
// with a task in hand, we're ready to prepare the envelope...
block, err := e.bs.Get(nextTask.Entry.Key)
if err != nil {
log.Warning("engine: task exists to send block, but block is not in blockstore")
continue
}
// construct message here so we can make decisions about any additional
// information we may want to include at this time.
m := bsmsg.New()
m := bsmsg.New() // TODO: maybe add keys from our wantlist?
m.AddBlock(block)
// TODO: maybe add keys from our wantlist?
log.Debugf("sending...")
select {
case <-ctx.Done():
return
case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
log.Debugf("sent")
}
return &Envelope{Peer: nextTask.Target, Message: m}, nil
}
}
func (e *Engine) Outbox() <-chan Envelope {
// Outbox returns a channel of one-time use Envelope channels.
func (e *Engine) Outbox() <-chan (<-chan Envelope) {
return e.outbox
}
......
package decision
import (
"errors"
"fmt"
"math"
"strings"
"sync"
......@@ -104,7 +106,8 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
for _ = range e.Outbox() {
for nextEnvelope := range e.Outbox() {
<-nextEnvelope
}
wg.Done()
}()
......@@ -116,6 +119,10 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) {
}
func TestPartnerWantsThenCancels(t *testing.T) {
numRounds := 10
if testing.Short() {
numRounds = 1
}
alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
vowels := strings.Split("aeiou", "")
......@@ -129,23 +136,31 @@ func TestPartnerWantsThenCancels(t *testing.T) {
},
}
for _, testcase := range testcases {
set := testcase[0]
cancels := testcase[1]
keeps := stringsComplement(set, cancels)
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := NewEngine(context.Background(), bs)
partner := testutil.RandPeerIDFatal(t)
for _, letter := range set {
block := blocks.NewBlock([]byte(letter))
bs.Put(block)
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
for _, letter := range alphabet {
block := blocks.NewBlock([]byte(letter))
if err := bs.Put(block); err != nil {
t.Fatal(err)
}
partnerWants(e, set, partner)
partnerCancels(e, cancels, partner)
assertPoppedInOrder(t, e, keeps)
}
for i := 0; i < numRounds; i++ {
for _, testcase := range testcases {
set := testcase[0]
cancels := testcase[1]
keeps := stringsComplement(set, cancels)
e := NewEngine(context.Background(), bs)
partner := testutil.RandPeerIDFatal(t)
partnerWants(e, set, partner)
partnerCancels(e, cancels, partner)
if err := checkHandledInOrder(t, e, keeps); err != nil {
t.Logf("run #%d of %d", i, numRounds)
t.Fatal(err)
}
}
}
}
func partnerWants(e *Engine, keys []string, partner peer.ID) {
......@@ -166,15 +181,17 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) {
e.MessageReceived(partner, cancels)
}
func assertPoppedInOrder(t *testing.T, e *Engine, keys []string) {
func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
for _, k := range keys {
envelope := <-e.Outbox()
next := <-e.Outbox()
envelope := <-next
received := envelope.Message.Blocks()[0]
expected := blocks.NewBlock([]byte(k))
if received.Key() != expected.Key() {
t.Fatal("received", string(received.Data), "expected", string(expected.Data))
return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data)))
}
}
return nil
}
func stringsComplement(set, subset []string) []string {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment