Unverified Commit ee93aa83 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #84 from ipfs/fix/reduce-alloc-2

remove allocations round two
parents 6e3e0400 8d357ff2
......@@ -2,6 +2,7 @@ package messagequeue
import (
"context"
"sync"
"time"
bsmsg "github.com/ipfs/go-bitswap/message"
......@@ -43,7 +44,7 @@ type MessageQueue struct {
}
type messageRequest struct {
entries []*bsmsg.Entry
entries []bsmsg.Entry
ses uint64
}
......@@ -65,9 +66,9 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
}
// AddMessage adds new entries to an outgoing message for a given session.
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
func (mq *MessageQueue) AddMessage(entries []bsmsg.Entry, ses uint64) {
select {
case mq.newRequests <- &messageRequest{entries, ses}:
case mq.newRequests <- newMessageRequest(entries, ses):
case <-mq.ctx.Done():
}
}
......@@ -123,8 +124,28 @@ func (mq *MessageQueue) runQueue() {
}
}
// We allocate a bunch of these so use a pool.
var messageRequestPool = sync.Pool{
New: func() interface{} {
return new(messageRequest)
},
}
func newMessageRequest(entries []bsmsg.Entry, session uint64) *messageRequest {
mr := messageRequestPool.Get().(*messageRequest)
mr.entries = entries
mr.ses = session
return mr
}
func returnMessageRequest(mr *messageRequest) {
*mr = messageRequest{}
messageRequestPool.Put(mr)
}
func (mr *messageRequest) handle(mq *MessageQueue) {
mq.addEntries(mr.entries, mr.ses)
returnMessageRequest(mr)
}
func (wr *wantlistRequest) handle(mq *MessageQueue) {
......@@ -140,7 +161,7 @@ func (wr *wantlistRequest) handle(mq *MessageQueue) {
}
}
func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) {
func (mq *MessageQueue) addEntries(entries []bsmsg.Entry, ses uint64) {
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
......
......@@ -19,7 +19,7 @@ var (
// PeerQueue provides a queer of messages to be sent for a single peer.
type PeerQueue interface {
AddMessage(entries []*bsmsg.Entry, ses uint64)
AddMessage(entries []bsmsg.Entry, ses uint64)
Startup()
AddWantlist(initialWants *wantlist.SessionTrackedWantlist)
Shutdown()
......@@ -108,7 +108,7 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
func (pm *PeerManager) SendMessage(entries []bsmsg.Entry, targets []peer.ID, from uint64) {
if len(targets) == 0 {
pm.peerQueuesLk.RLock()
for _, p := range pm.peerQueues {
......
......@@ -15,7 +15,7 @@ import (
type messageSent struct {
p peer.ID
entries []*bsmsg.Entry
entries []bsmsg.Entry
ses uint64
}
......@@ -27,7 +27,7 @@ type fakePeer struct {
func (fp *fakePeer) Startup() {}
func (fp *fakePeer) Shutdown() {}
func (fp *fakePeer) AddMessage(entries []*bsmsg.Entry, ses uint64) {
func (fp *fakePeer) AddMessage(entries []bsmsg.Entry, ses uint64) {
fp.messagesSent <- messageSent{fp.p, entries, ses}
}
func (fp *fakePeer) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {}
......@@ -44,7 +44,7 @@ func collectAndCheckMessages(
ctx context.Context,
t *testing.T,
messagesSent <-chan messageSent,
entries []*bsmsg.Entry,
entries []bsmsg.Entry,
ses uint64,
timeout time.Duration) []peer.ID {
var peersReceived []peer.ID
......
......@@ -50,11 +50,11 @@ func GenerateWantlist(n int, ses uint64) *wantlist.SessionTrackedWantlist {
}
// GenerateMessageEntries makes fake bitswap message entries.
func GenerateMessageEntries(n int, isCancel bool) []*bsmsg.Entry {
bsmsgs := make([]*bsmsg.Entry, 0, n)
func GenerateMessageEntries(n int, isCancel bool) []bsmsg.Entry {
bsmsgs := make([]bsmsg.Entry, 0, n)
for i := 0; i < n; i++ {
prioritySeq++
msg := &bsmsg.Entry{
msg := bsmsg.Entry{
Entry: wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq),
Cancel: isCancel,
}
......
......@@ -25,7 +25,7 @@ const (
type PeerHandler interface {
Disconnected(p peer.ID)
Connected(p peer.ID, initialWants *wantlist.SessionTrackedWantlist)
SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64)
SendMessage(entries []bsmsg.Entry, targets []peer.ID, from uint64)
}
type wantMessage interface {
......@@ -187,9 +187,9 @@ func (wm *WantManager) run() {
}
func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) {
entries := make([]*bsmsg.Entry, 0, len(ks))
entries := make([]bsmsg.Entry, 0, len(ks))
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
entries = append(entries, bsmsg.Entry{
Cancel: cancel,
Entry: wantlist.NewRefEntry(k, maxPriority-i),
})
......@@ -202,7 +202,7 @@ func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []p
}
type wantSet struct {
entries []*bsmsg.Entry
entries []bsmsg.Entry
targets []peer.ID
from uint64
}
......
......@@ -19,7 +19,7 @@ type fakePeerHandler struct {
lastWantSet wantSet
}
func (fph *fakePeerHandler) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
func (fph *fakePeerHandler) SendMessage(entries []bsmsg.Entry, targets []peer.ID, from uint64) {
fph.lk.Lock()
fph.lastWantSet = wantSet{entries, targets, from}
fph.lk.Unlock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment