Commit 69ff295f authored by hannahhoward's avatar hannahhoward

feat(messagequeue): Add block sending

Add the ability to send blocks and receive notification when complete to messagequeue
parent 58ce371b
...@@ -5,6 +5,8 @@ import ( ...@@ -5,6 +5,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/ipfs/go-block-format"
gsmsg "github.com/ipfs/go-graphsync/message" gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network" gsnet "github.com/ipfs/go-graphsync/network"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
...@@ -34,6 +36,7 @@ type MessageQueue struct { ...@@ -34,6 +36,7 @@ type MessageQueue struct {
// internal do not touch outside go routines // internal do not touch outside go routines
nextMessage gsmsg.GraphSyncMessage nextMessage gsmsg.GraphSyncMessage
nextMessageLk sync.RWMutex nextMessageLk sync.RWMutex
processedNotifiers []chan struct{}
sender gsnet.MessageSender sender gsnet.MessageSender
} }
...@@ -56,7 +59,7 @@ func (mq *MessageQueue) AddRequest( ...@@ -56,7 +59,7 @@ func (mq *MessageQueue) AddRequest(
if mq.mutateNextMessage(func(nextMessage gsmsg.GraphSyncMessage) { if mq.mutateNextMessage(func(nextMessage gsmsg.GraphSyncMessage) {
nextMessage.AddRequest(id, selector, priority) nextMessage.AddRequest(id, selector, priority)
}) { }, nil) {
mq.signalWork() mq.signalWork()
} }
} }
...@@ -66,9 +69,24 @@ func (mq *MessageQueue) Cancel(id gsmsg.GraphSyncRequestID) { ...@@ -66,9 +69,24 @@ func (mq *MessageQueue) Cancel(id gsmsg.GraphSyncRequestID) {
if mq.mutateNextMessage(func(nextMessage gsmsg.GraphSyncMessage) { if mq.mutateNextMessage(func(nextMessage gsmsg.GraphSyncMessage) {
nextMessage.Cancel(id) nextMessage.Cancel(id)
}) { }, nil) {
mq.signalWork()
}
}
// AddBlocks adds the given blocks to the next message and returns a channel
// that sends a notification when the blocks are read. If ignored by the consumer
// sending will not block.
func (mq *MessageQueue) AddBlocks(blks []blocks.Block) <-chan struct{} {
notificationChannel := make(chan struct{}, 1)
if mq.mutateNextMessage(func(nextMessage gsmsg.GraphSyncMessage) {
for _, block := range blks {
nextMessage.AddBlock(block)
}
}, notificationChannel) {
mq.signalWork() mq.signalWork()
} }
return notificationChannel
} }
// Startup starts the processing of messages, and creates an initial message // Startup starts the processing of messages, and creates an initial message
...@@ -101,13 +119,16 @@ func (mq *MessageQueue) runQueue() { ...@@ -101,13 +119,16 @@ func (mq *MessageQueue) runQueue() {
} }
} }
func (mq *MessageQueue) mutateNextMessage(mutator func(gsmsg.GraphSyncMessage)) bool { func (mq *MessageQueue) mutateNextMessage(mutator func(gsmsg.GraphSyncMessage), processedNotifier chan struct{}) bool {
mq.nextMessageLk.Lock() mq.nextMessageLk.Lock()
defer mq.nextMessageLk.Unlock() defer mq.nextMessageLk.Unlock()
if mq.nextMessage == nil { if mq.nextMessage == nil {
mq.nextMessage = gsmsg.New() mq.nextMessage = gsmsg.New()
} }
mutator(mq.nextMessage) mutator(mq.nextMessage)
if processedNotifier != nil {
mq.processedNotifiers = append(mq.processedNotifiers, processedNotifier)
}
return !mq.nextMessage.Empty() return !mq.nextMessage.Empty()
} }
...@@ -123,6 +144,14 @@ func (mq *MessageQueue) extractOutgoingMessage() gsmsg.GraphSyncMessage { ...@@ -123,6 +144,14 @@ func (mq *MessageQueue) extractOutgoingMessage() gsmsg.GraphSyncMessage {
mq.nextMessageLk.Lock() mq.nextMessageLk.Lock()
message := mq.nextMessage message := mq.nextMessage
mq.nextMessage = nil mq.nextMessage = nil
for _, processedNotifier := range mq.processedNotifiers {
select {
case processedNotifier <- struct{}{}:
default:
}
close(processedNotifier)
}
mq.processedNotifiers = nil
mq.nextMessageLk.Unlock() mq.nextMessageLk.Unlock()
return message return message
} }
......
...@@ -88,6 +88,52 @@ func TestStartupAndShutdown(t *testing.T) { ...@@ -88,6 +88,52 @@ func TestStartupAndShutdown(t *testing.T) {
} }
} }
func TestProcessingNotification(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
peer := testutil.GeneratePeers(1)[0]
messagesSent := make(chan gsmsg.GraphSyncMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
messageQueue := New(ctx, peer, messageNetwork)
waitGroup.Add(1)
blks := testutil.GenerateBlocksOfSize(3, 128)
blocksProcessing := messageQueue.AddBlocks(blks)
select {
case <-blocksProcessing:
t.Fatal("Blocks should not be processing but already received notification")
default:
}
// wait for send attempt
messageQueue.Startup()
waitGroup.Wait()
select {
case <-blocksProcessing:
case <-ctx.Done():
t.Fatal("blocks should have been processed but were not")
}
select {
case <-ctx.Done():
t.Fatal("no messages were sent")
case message := <-messagesSent:
receivedBlocks := message.Blocks()
for _, block := range receivedBlocks {
if !testutil.ContainsBlock(blks, block) {
t.Fatal("sent incorrect block")
}
}
}
}
func TestDedupingMessages(t *testing.T) { func TestDedupingMessages(t *testing.T) {
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second) ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment