Unverified Commit 7fd8d7c2 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #94 from ipfs/bugs/runqueue-performance

fix(messagequeue): Remove second run loop
parents c4f88fa3 576388c6
......@@ -33,14 +33,15 @@ type MessageQueue struct {
p peer.ID
network MessageNetwork
newRequests chan request
outgoingMessages chan bsmsg.BitSwapMessage
done chan struct{}
newRequests chan request
outgoingWork chan struct{}
done chan struct{}
// do not touch out of run loop
wl *wantlist.SessionTrackedWantlist
nextMessage bsmsg.BitSwapMessage
sender bsnet.MessageSender
wl *wantlist.SessionTrackedWantlist
nextMessage bsmsg.BitSwapMessage
nextMessageLk sync.RWMutex
sender bsnet.MessageSender
}
type messageRequest struct {
......@@ -55,32 +56,44 @@ type wantlistRequest struct {
// New creats a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
return &MessageQueue{
ctx: ctx,
wl: wantlist.NewSessionTrackedWantlist(),
network: network,
p: p,
newRequests: make(chan request, 16),
outgoingMessages: make(chan bsmsg.BitSwapMessage),
done: make(chan struct{}),
ctx: ctx,
wl: wantlist.NewSessionTrackedWantlist(),
network: network,
p: p,
newRequests: make(chan request, 16),
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
}
}
// AddMessage adds new entries to an outgoing message for a given session.
func (mq *MessageQueue) AddMessage(entries []bsmsg.Entry, ses uint64) {
if !mq.addEntries(entries, ses) {
return
}
select {
case mq.newRequests <- newMessageRequest(entries, ses):
case <-mq.ctx.Done():
case mq.outgoingWork <- struct{}{}:
default:
}
}
// AddWantlist adds a complete session tracked want list to a message queue
func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {
wl := wantlist.NewSessionTrackedWantlist()
initialWants.CopyWants(wl)
mq.nextMessageLk.Lock()
defer mq.nextMessageLk.Unlock()
select {
case mq.newRequests <- &wantlistRequest{wl}:
case <-mq.ctx.Done():
initialWants.CopyWants(mq.wl)
if initialWants.Len() > 0 {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
for _, e := range initialWants.Entries() {
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
select {
case mq.outgoingWork <- struct{}{}:
default:
}
}
}
......@@ -88,7 +101,6 @@ func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlis
// based on the given initial wantlist.
func (mq *MessageQueue) Startup() {
go mq.runQueue()
go mq.sendMessages()
}
// Shutdown stops the processing of messages for a message queue.
......@@ -97,19 +109,10 @@ func (mq *MessageQueue) Shutdown() {
}
func (mq *MessageQueue) runQueue() {
outgoingMessages := func() chan bsmsg.BitSwapMessage {
if mq.nextMessage == nil {
return nil
}
return mq.outgoingMessages
}
for {
select {
case newRequest := <-mq.newRequests:
newRequest.handle(mq)
case outgoingMessages() <- mq.nextMessage:
mq.nextMessage = nil
case <-mq.outgoingWork:
mq.sendMessage()
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
......@@ -124,77 +127,45 @@ func (mq *MessageQueue) runQueue() {
}
}
// We allocate a bunch of these so use a pool.
var messageRequestPool = sync.Pool{
New: func() interface{} {
return new(messageRequest)
},
}
func newMessageRequest(entries []bsmsg.Entry, session uint64) *messageRequest {
mr := messageRequestPool.Get().(*messageRequest)
mr.entries = entries
mr.ses = session
return mr
}
func returnMessageRequest(mr *messageRequest) {
*mr = messageRequest{}
messageRequestPool.Put(mr)
}
func (mr *messageRequest) handle(mq *MessageQueue) {
mq.addEntries(mr.entries, mr.ses)
returnMessageRequest(mr)
}
func (wr *wantlistRequest) handle(mq *MessageQueue) {
initialWants := wr.wl
initialWants.CopyWants(mq.wl)
if initialWants.Len() > 0 {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
for _, e := range initialWants.Entries() {
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
func (mq *MessageQueue) addEntries(entries []bsmsg.Entry, ses uint64) bool {
var work bool
mq.nextMessageLk.Lock()
defer mq.nextMessageLk.Unlock()
// if we have no message held allocate a new one
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
}
func (mq *MessageQueue) addEntries(entries []bsmsg.Entry, ses uint64) {
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
work = true
mq.nextMessage.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority, ses) {
if mq.nextMessage == nil {
mq.nextMessage = bsmsg.New(false)
}
work = true
mq.nextMessage.AddEntry(e.Cid, e.Priority)
}
}
}
return work
}
func (mq *MessageQueue) sendMessages() {
for {
select {
case nextMessage := <-mq.outgoingMessages:
mq.sendMessage(nextMessage)
case <-mq.done:
return
case <-mq.ctx.Done():
return
}
}
func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage {
// grab outgoing message
mq.nextMessageLk.Lock()
message := mq.nextMessage
mq.nextMessage = nil
mq.nextMessageLk.Unlock()
return message
}
func (mq *MessageQueue) sendMessage(message bsmsg.BitSwapMessage) {
func (mq *MessageQueue) sendMessage() {
message := mq.extractOutgoingMessage()
if message == nil || message.Empty() {
return
}
err := mq.initializeSender()
if err != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment