Unverified Commit 39076f4d authored by Alex Cruikshank's avatar Alex Cruikshank Committed by GitHub

move block allocation into message queue (#140)

Co-authored-by: default avataracruikshank <acruikshank@example.com>
parent 319ab7e6
......@@ -141,7 +141,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
asyncLoader := asyncloader.New(ctx, loader, storer)
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
responseAssembler := responseassembler.New(ctx, allocator, peerManager)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, loader, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
graphSync := &GraphSync{
......
......@@ -43,6 +43,7 @@ type MessageNetwork interface {
}
type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
ReleasePeerMemory(p peer.ID) error
ReleaseBlockMemory(p peer.ID, amount uint64) error
}
......@@ -80,8 +81,16 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Alloc
}
}
// BuildMessage allows you to work modify the next message that is sent in the queue
func (mq *MessageQueue) BuildMessage(size uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
// AllocateAndBuildMessage allows you to work modify the next message that is sent in the queue.
// If blkSize > 0, message building may block until enough memory has been freed from the queues to allocate the message.
func (mq *MessageQueue) AllocateAndBuildMessage(size uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
if size > 0 {
select {
case <-mq.allocator.AllocateBlockMemory(mq.p, size):
case <-mq.ctx.Done():
return
}
}
if mq.buildMessage(size, buildMessageFn, notifees) {
mq.signalWork()
}
......
......@@ -77,7 +77,7 @@ func TestStartupAndShutdown(t *testing.T) {
root := testutil.GenerateCids(1)[0]
waitGroup.Add(1)
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
}, []notifications.Notifee{})
......@@ -116,7 +116,7 @@ func TestShutdownDuringMessageSend(t *testing.T) {
// setup a message and advance as far as beginning to send it
waitGroup.Add(1)
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
}, []notifications.Notifee{})
waitGroup.Wait()
......@@ -168,7 +168,7 @@ func TestProcessingNotification(t *testing.T) {
status := graphsync.RequestCompletedFull
expectedTopic := "testTopic"
notifee, verifier := testutil.NewTestNotifee(expectedTopic, 5)
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddResponseCode(responseID, status)
b.AddExtensionData(responseID, extension)
}, []notifications.Notifee{notifee})
......@@ -219,7 +219,7 @@ func TestDedupingMessages(t *testing.T) {
selector := ssb.Matcher().Node()
root := testutil.GenerateCids(1)[0]
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
}, []notifications.Notifee{})
// wait for send attempt
......@@ -233,7 +233,7 @@ func TestDedupingMessages(t *testing.T) {
selector3 := ssb.ExploreIndex(0, ssb.Matcher()).Node()
root3 := testutil.GenerateCids(1)[0]
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id2, root2, selector2, priority2))
b.AddRequest(gsmsg.NewRequest(id3, root3, selector3, priority3))
}, []notifications.Notifee{})
......@@ -268,7 +268,7 @@ func TestDedupingMessages(t *testing.T) {
}
}
func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
func TestSendsVeryLargeBlocksResponses(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
......@@ -288,7 +288,7 @@ func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
// generate large blocks before proceeding
blks := testutil.GenerateBlocksOfSize(5, 1000000)
messageQueue.BuildMessage(uint64(len(blks[0].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[0].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[0])
}, []notifications.Notifee{})
waitGroup.Wait()
......@@ -300,13 +300,13 @@ func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
require.True(t, blks[0].Cid().Equals(msgBlks[0].Cid()))
// Send 3 very large blocks
messageQueue.BuildMessage(uint64(len(blks[1].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[1].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[1])
}, []notifications.Notifee{})
messageQueue.BuildMessage(uint64(len(blks[2].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[2].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[2])
}, []notifications.Notifee{})
messageQueue.BuildMessage(uint64(len(blks[3].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[3].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[3])
}, []notifications.Notifee{})
......@@ -325,3 +325,60 @@ func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
require.Len(t, msgBlks, 1, "number of blks in first message was not 1")
require.True(t, blks[3].Cid().Equals(msgBlks[0].Cid()))
}
func TestSendsResponsesMemoryPressure(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
p := testutil.GeneratePeers(1)[0]
messagesSent := make(chan gsmsg.GraphSyncMessage, 0)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
// use allocator with very small limit
allocator := allocator2.NewAllocator(1000, 1000)
messageQueue := New(ctx, p, messageNetwork, allocator)
messageQueue.Startup()
waitGroup.Add(1)
// start sending block that exceeds memory limit
blks := testutil.GenerateBlocksOfSize(2, 999)
messageQueue.AllocateAndBuildMessage(uint64(len(blks[0].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[0])
}, []notifications.Notifee{})
finishes := make(chan string, 2)
go func() {
// attempt to send second block. Should block until memory is released
messageQueue.AllocateAndBuildMessage(uint64(len(blks[1].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[1])
}, []notifications.Notifee{})
finishes <- "sent message"
}()
// assert transaction does not complete within 200ms because it is waiting on memory
ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond)
select {
case <-finishes:
t.Fatal("transaction failed to wait on memory")
case <-ctx2.Done():
}
// Allow first message to complete sending
<-messagesSent
// assert message is now queued within 200ms
ctx2, cancel2 = context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel2()
select {
case <-finishes:
cancel2()
case <-ctx2.Done():
t.Fatal("timeout waiting for transaction to complete")
}
}
......@@ -12,7 +12,7 @@ import (
// PeerQueue is a process that sends messages to a peer
type PeerQueue interface {
PeerProcess
BuildMessage(blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
AllocateAndBuildMessage(blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}
// PeerQueueFactory provides a function that will create a PeerQueue.
......@@ -33,7 +33,8 @@ func NewMessageManager(ctx context.Context, createPeerQueue PeerQueueFactory) *P
}
// BuildMessage allows you to modify the next message that is sent for the given peer
func (pmm *PeerMessageManager) BuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
// If blkSize > 0, message building may block until enough memory has been freed from the queues to allocate the message.
func (pmm *PeerMessageManager) AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
pq := pmm.GetProcess(p).(PeerQueue)
pq.BuildMessage(blkSize, buildMessageFn, notifees)
pq.AllocateAndBuildMessage(blkSize, buildMessageFn, notifees)
}
......@@ -29,7 +29,7 @@ type fakePeer struct {
messagesSent chan messageSent
}
func (fp *fakePeer) BuildMessage(blkSize uint64, buildMessage func(b *gsmsg.Builder), notifees []notifications.Notifee) {
func (fp *fakePeer) AllocateAndBuildMessage(blkSize uint64, buildMessage func(b *gsmsg.Builder), notifees []notifications.Notifee) {
builder := gsmsg.NewBuilder(gsmsg.Topic(0))
buildMessage(builder)
message, err := builder.Build()
......@@ -76,14 +76,14 @@ func TestSendingMessagesToPeers(t *testing.T) {
peerManager := NewMessageManager(ctx, peerQueueFactory)
request := gsmsg.NewRequest(id, root, selector, priority)
peerManager.BuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
peerManager.AllocateAndBuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
b.AddRequest(request)
}, []notifications.Notifee{})
peerManager.BuildMessage(tp[1], 0, func(b *gsmsg.Builder) {
peerManager.AllocateAndBuildMessage(tp[1], 0, func(b *gsmsg.Builder) {
b.AddRequest(request)
}, []notifications.Notifee{})
cancelRequest := gsmsg.CancelRequest(id)
peerManager.BuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
peerManager.AllocateAndBuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
b.AddRequest(cancelRequest)
}, []notifications.Notifee{})
......
......@@ -47,7 +47,7 @@ type inProgressRequestStatus struct {
// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
BuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}
// AsyncLoader is an interface for loading links asynchronously, returning
......@@ -566,7 +566,7 @@ const requestNetworkError = "request_network_error"
func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners})
failNotifee := notifications.Notifee{Data: requestNetworkError, Subscriber: sub}
rm.peerHandler.BuildMessage(p, 0, func(builder *gsmsg.Builder) {
rm.peerHandler.AllocateAndBuildMessage(p, 0, func(builder *gsmsg.Builder) {
builder.AddRequest(request)
}, []notifications.Notifee{failNotifee})
}
......
......@@ -35,7 +35,7 @@ type fakePeerHandler struct {
requestRecordChan chan requestRecord
}
func (fph *fakePeerHandler) BuildMessage(p peer.ID, blkSize uint64,
func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64,
requestBuilder func(b *gsmsg.Builder), notifees []notifications.Notifee) {
builder := gsmsg.NewBuilder(gsmsg.Topic(0))
requestBuilder(builder)
......
......@@ -51,32 +51,26 @@ type ResponseBuilder interface {
}
// PeerMessageHandler is an interface that can queue a response for a given peer to go out over the network
// If blkSize > 0, message building may block until enough memory has been freed from the queues to allocate the message.
type PeerMessageHandler interface {
BuildMessage(p peer.ID, blkSize uint64, buildResponseFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}
// Allocator is an interface that can manage memory allocated for blocks
type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildResponseFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}
// ResponseAssembler manages assembling responses to go out over the network
// in libp2p messages
type ResponseAssembler struct {
*peermanager.PeerManager
allocator Allocator
peerHandler PeerMessageHandler
ctx context.Context
}
// New generates a new ResponseAssembler for sending responses
func New(ctx context.Context, allocator Allocator, peerHandler PeerMessageHandler) *ResponseAssembler {
func New(ctx context.Context, peerHandler PeerMessageHandler) *ResponseAssembler {
return &ResponseAssembler{
PeerManager: peermanager.New(ctx, func(ctx context.Context, p peer.ID) peermanager.PeerHandler {
return newTracker()
}),
ctx: ctx,
allocator: allocator,
peerHandler: peerHandler,
}
}
......@@ -110,14 +104,7 @@ func (ra *ResponseAssembler) execute(p peer.ID, operations []responseOperation,
for _, op := range operations {
size += op.size()
}
if size > 0 {
select {
case <-ra.allocator.AllocateBlockMemory(p, size):
case <-ra.ctx.Done():
return
}
}
ra.peerHandler.BuildMessage(p, size, func(builder *gsmsg.Builder) {
ra.peerHandler.AllocateAndBuildMessage(p, size, func(builder *gsmsg.Builder) {
for _, op := range operations {
op.build(builder)
}
......
......@@ -16,7 +16,6 @@ import (
"github.com/ipfs/go-graphsync"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/responsemanager/allocator"
"github.com/ipfs/go-graphsync/testutil"
)
......@@ -38,8 +37,7 @@ func TestResponseAssemblerSendsResponses(t *testing.T) {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
fph := newFakePeerHandler(ctx, t)
allocator := allocator.NewAllocator(1<<30, 1<<30)
responseAssembler := New(ctx, allocator, fph)
responseAssembler := New(ctx, fph)
var bd1, bd2 graphsync.BlockData
......@@ -125,8 +123,7 @@ func TestResponseAssemblerSendsExtensionData(t *testing.T) {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
fph := newFakePeerHandler(ctx, t)
allocator := allocator.NewAllocator(1<<30, 1<<30)
responseAssembler := New(ctx, allocator, fph)
responseAssembler := New(ctx, fph)
require.NoError(t, responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
b.SendResponse(links[0], blks[0].RawData())
......@@ -171,8 +168,7 @@ func TestResponseAssemblerSendsResponsesInTransaction(t *testing.T) {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
fph := newFakePeerHandler(ctx, t)
allocator := allocator.NewAllocator(1<<30, 1<<30)
responseAssembler := New(ctx, allocator, fph)
responseAssembler := New(ctx, fph)
notifee, _ := testutil.NewTestNotifee("transaction", 10)
err := responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
bd := b.SendResponse(links[0], blks[0].RawData())
......@@ -210,8 +206,7 @@ func TestResponseAssemblerIgnoreBlocks(t *testing.T) {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
fph := newFakePeerHandler(ctx, t)
allocator := allocator.NewAllocator(1<<30, 1<<30)
responseAssembler := New(ctx, allocator, fph)
responseAssembler := New(ctx, fph)
responseAssembler.IgnoreBlocks(p, requestID1, links)
......@@ -278,8 +273,7 @@ func TestResponseAssemblerDupKeys(t *testing.T) {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
fph := newFakePeerHandler(ctx, t)
allocator := allocator.NewAllocator(1<<30, 1<<30)
responseAssembler := New(ctx, allocator, fph)
responseAssembler := New(ctx, fph)
responseAssembler.DedupKey(p, requestID1, "applesauce")
responseAssembler.DedupKey(p, requestID3, "applesauce")
......@@ -342,67 +336,6 @@ func TestResponseAssemblerDupKeys(t *testing.T) {
fph.AssertResponses(expectedResponses{requestID3: graphsync.PartialResponse})
}
func TestResponseAssemblerSendsResponsesMemoryPressure(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
p := testutil.GeneratePeers(1)[0]
requestID1 := graphsync.RequestID(rand.Int31())
blks := testutil.GenerateBlocksOfSize(5, 100)
links := make([]ipld.Link, 0, len(blks))
for _, block := range blks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
fph := newFakePeerHandler(ctx, t)
allocator := newFakeAllocator()
responseAssembler := New(ctx, allocator, fph)
finishes := make(chan string, 2)
go func() {
err := responseAssembler.Transaction(p, requestID1, func(responseBuilder ResponseBuilder) error {
bd := responseBuilder.SendResponse(links[0], blks[0].RawData())
assertSentOnWire(t, bd, blks[0])
bd = responseBuilder.SendResponse(links[1], blks[1].RawData())
assertSentOnWire(t, bd, blks[1])
bd = responseBuilder.SendResponse(links[2], blks[2].RawData())
assertSentOnWire(t, bd, blks[2])
bd = responseBuilder.SendResponse(links[3], blks[3].RawData())
assertSentOnWire(t, bd, blks[3])
responseBuilder.FinishRequest()
return nil
})
require.NoError(t, err)
finishes <- "sent message"
}()
// assert transaction does not complete within 200ms because it is waiting on memory
ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond)
select {
case <-finishes:
t.Fatal("transaction failed to wait on memory")
cancel2()
case <-ctx2.Done():
}
// simulate the release of memory
allocator.response <- nil
// assert transaction now completes within 200ms
ctx2, cancel2 = context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel2()
select {
case <-finishes:
cancel()
case <-ctx2.Done():
t.Fatal("timeout waiting for transaction to complete")
}
fph.AssertBlocks(blks[0], blks[1], blks[2], blks[3])
fph.AssertResponses(expectedResponses{
requestID1: graphsync.RequestCompletedFull,
})
}
func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) {
for _, response := range responses {
if response.RequestID() == requestID {
......@@ -496,7 +429,7 @@ func (fph *fakePeerHandler) RefuteResponses() {
require.Empty(fph.t, fph.lastResponses)
}
func (fph *fakePeerHandler) BuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
builder := gsmsg.NewBuilder(gsmsg.Topic(0))
buildMessageFn(builder)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment