Commit 777c0d9a authored by Steven Allen's avatar Steven Allen

feat: debounce wants manually

This:

* Makes it easy to send immediately if we wait too long and/or if we have enough
to send.
* Is significantly more efficient than the debounce library as it doesn't spin
off a bunch of "after" timers.

fixes #245
parent 20be0848
......@@ -5,8 +5,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo=
github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c h1:aEbSeNALREWXk0G7UdNhR3ayBV7tZ4M2PNmnrCAph6Q=
......@@ -94,8 +92,6 @@ github.com/ipfs/go-cid v0.0.2 h1:tuuKaZPU1M6HcejsO3AcYWW8sZ8MTvyxfc4uqB4eFE8=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3 h1:UIAh32wymBpStoe83YCzwVQQ5Oy/H0FdxvUS6DJDzms=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.4 h1:UlfXKrZx1DjZoBhQHmNHLC1fK1dUJDN20Y28A7s+gJ8=
github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj6+M=
github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU=
github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog=
github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms=
......@@ -344,8 +340,6 @@ github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multihash v0.0.8 h1:wrYcW5yxSi3dU07n5jnuS5PrNwyHy0zRHGVoUugWvXg=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.0.10 h1:lMoNbh2Ssd9PUF74Nz008KGzGPlfeV6wH3rit5IIGCM=
github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc=
github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multistream v0.1.0 h1:UpO6jrsjqs46mqAK3n6wKRYFhugss9ArzbyUzU+4wkQ=
......
......@@ -6,8 +6,6 @@ import (
"sync"
"time"
debounce "github.com/bep/debounce"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
bsnet "github.com/ipfs/go-bitswap/network"
......@@ -34,6 +32,11 @@ const (
maxPriority = math.MaxInt32
// sendMessageDebounce is the debounce duration when calling sendMessage()
sendMessageDebounce = time.Millisecond
// when we reach sendMessaageCuttoff wants/cancels, we'll send the message immediately.
sendMessageCuttoff = 100
// when we debounce for more than sendMessageMaxDelay, we'll send the
// message immediately.
sendMessageMaxDelay = 100 * time.Millisecond
)
// MessageNetwork is any network that can connect peers and generate a message
......@@ -54,9 +57,8 @@ type MessageQueue struct {
maxMessageSize int
sendErrorBackoff time.Duration
signalWorkReady func()
outgoingWork chan struct{}
done chan struct{}
outgoingWork chan time.Time
done chan struct{}
// Take lock whenever any of these variables are modified
wllock sync.Mutex
......@@ -165,17 +167,13 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
bcstWants: newRecallWantList(),
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan struct{}, 1),
outgoingWork: make(chan time.Time, 1),
done: make(chan struct{}),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
}
// Apply debounce to the work ready signal (which triggers sending a message)
debounced := debounce.New(sendMessageDebounce)
mq.signalWorkReady = func() { debounced(mq.onWorkReady) }
return mq
}
......@@ -285,11 +283,42 @@ func (mq *MessageQueue) onShutdown() {
func (mq *MessageQueue) runQueue() {
defer mq.onShutdown()
// Create a timer for debouncing scheduled work.
scheduleWork := time.NewTimer(0)
if !scheduleWork.Stop() {
<-scheduleWork.C
}
var workScheduled time.Time
for {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
case <-mq.outgoingWork:
case when := <-mq.outgoingWork:
// If we have work scheduled, cancel the timer. If we
// don't, record when the work was scheduled.
// We send the time on the channel so we accurately
// track delay.
if workScheduled.IsZero() {
workScheduled = when
} else if !scheduleWork.Stop() {
<-scheduleWork.C
}
// If we have too many updates and/or we've waited too
// long, send immediately.
if mq.pendingWorkCount() > sendMessageCuttoff ||
time.Since(workScheduled) >= sendMessageMaxDelay {
mq.sendIfReady()
workScheduled = time.Time{}
} else {
// Otherwise, extend the timer.
scheduleWork.Reset(sendMessageDebounce)
}
case <-scheduleWork.C:
// We have work scheduled and haven't seen any updates
// in sendMessageDebounce. Send immediately.
workScheduled = time.Time{}
mq.sendIfReady()
case <-mq.done:
if mq.sender != nil {
......@@ -335,9 +364,9 @@ func (mq *MessageQueue) transferRebroadcastWants() bool {
return true
}
func (mq *MessageQueue) onWorkReady() {
func (mq *MessageQueue) signalWorkReady() {
select {
case mq.outgoingWork <- struct{}{}:
case mq.outgoingWork <- time.Now():
default:
}
}
......@@ -443,10 +472,14 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) {
// }
func (mq *MessageQueue) hasPendingWork() bool {
return mq.pendingWorkCount() > 0
}
func (mq *MessageQueue) pendingWorkCount() int {
mq.wllock.Lock()
defer mq.wllock.Unlock()
return mq.bcstWants.pending.Len() > 0 || mq.peerWants.pending.Len() > 0 || mq.cancels.Len() > 0
return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len()
}
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment