Unverified Commit e9653ded authored by Alex Cruikshank's avatar Alex Cruikshank Committed by GitHub

Permit multiple data subscriptions per original topic (#128)

* modify transformable event logic to simply add data

* permit multiple data topics so sent hook gets calle for every block

* rename test to match method rename.
Co-authored-by: default avatardirkmc <dirkmdev@gmail.com>
Co-authored-by: default avataracruikshank <acruikshank@example.com>
Co-authored-by: default avatardirkmc <dirkmdev@gmail.com>
parent 11d30c60
......@@ -928,6 +928,87 @@ func TestUnixFSFetch(t *testing.T) {
require.Equal(t, origBytes, finalBytes, "should have gotten same bytes written as read but didn't")
}
func TestGraphsyncBlockListeners(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()
// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)
// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
// register hooks to count blocks in various stages
blocksSent := 0
blocksOutgoing := 0
blocksIncoming := 0
responder.RegisterBlockSentListener(func(p peer.ID, request graphsync.RequestData, block graphsync.BlockData) {
blocksSent++
})
requestor.RegisterIncomingBlockHook(func(p peer.ID, r graphsync.ResponseData, b graphsync.BlockData, h graphsync.IncomingBlockHookActions) {
blocksIncoming++
})
responder.RegisterOutgoingBlockHook(func(p peer.ID, r graphsync.RequestData, b graphsync.BlockData, h graphsync.OutgoingBlockHookActions) {
blocksOutgoing++
})
var receivedResponseData []byte
var receivedRequestData []byte
requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = data
}
})
responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
var has bool
receivedRequestData, has = requestData.Extension(td.extensionName)
if !has {
hookActions.TerminateWithError(errors.New("Missing extension"))
} else {
hookActions.SendExtensionData(td.extensionResponse)
}
})
finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)
blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")
// verify extension round trip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")
// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)
// assert we get notified for all the blocks
require.Equal(t, blockChainLength, blocksOutgoing)
require.Equal(t, blockChainLength, blocksIncoming)
require.Equal(t, blockChainLength, blocksSent)
}
type gsTestData struct {
mn mocknet.Mocknet
ctx context.Context
......
......@@ -141,7 +141,7 @@ func (mq *MessageQueue) mutateNextMessage(mutator func(gsmsg.GraphSyncMessage),
}
mutator(mq.nextMessage)
for _, notifee := range notifees {
notifications.SubscribeOn(mq.eventPublisher, mq.nextMessageTopic, notifee)
notifications.SubscribeWithData(mq.eventPublisher, mq.nextMessageTopic, notifee)
}
return !mq.nextMessage.Empty()
}
......
package notifications
import "sync"
type TopicDataSubscriber struct {
idMapLk sync.RWMutex
data map[Topic][]TopicData
Subscriber
}
// NewTopicDataSubscriber produces a subscriber that will transform
// events and topics before passing them on to the given subscriber
func NewTopicDataSubscriber(sub Subscriber) *TopicDataSubscriber {
return &TopicDataSubscriber{
Subscriber: sub,
data: make(map[Topic][]TopicData),
}
}
func (m *TopicDataSubscriber) AddTopicData(id Topic, data TopicData) {
m.idMapLk.Lock()
m.data[id] = append(m.data[id], data)
m.idMapLk.Unlock()
}
func (m *TopicDataSubscriber) getData(id Topic) []TopicData {
m.idMapLk.RLock()
defer m.idMapLk.RUnlock()
data, ok := m.data[id]
if !ok {
return []TopicData{}
}
newData := make([]TopicData, len(data))
for i, d := range data {
newData[i] = d
}
return newData
}
func (m *TopicDataSubscriber) OnNext(topic Topic, ev Event) {
for _, data := range m.getData(topic) {
m.Subscriber.OnNext(data, ev)
}
}
func (m *TopicDataSubscriber) OnClose(topic Topic) {
for _, data := range m.getData(topic) {
m.Subscriber.OnClose(data)
}
}
package notifications
import "sync"
type mappableSubscriber struct {
idMapLk sync.RWMutex
idMap map[Topic]Topic
eventTransform EventTransform
Subscriber
}
// NewMappableSubscriber produces a subscriber that will transform
// events and topics before passing them on to the given subscriber
func NewMappableSubscriber(sub Subscriber, eventTransform EventTransform) MappableSubscriber {
return &mappableSubscriber{
Subscriber: sub,
eventTransform: eventTransform,
idMap: make(map[Topic]Topic),
}
}
func (m *mappableSubscriber) Map(id Topic, destinationID Topic) {
m.idMapLk.Lock()
m.idMap[id] = destinationID
m.idMapLk.Unlock()
}
func (m *mappableSubscriber) transform(id Topic) Topic {
m.idMapLk.RLock()
defer m.idMapLk.RUnlock()
destID, ok := m.idMap[id]
if !ok {
return id
}
return destID
}
func (m *mappableSubscriber) OnNext(topic Topic, ev Event) {
newTopic := m.transform(topic)
newEv := m.eventTransform(ev)
m.Subscriber.OnNext(newTopic, newEv)
}
func (m *mappableSubscriber) OnClose(topic Topic) {
newTopic := m.transform(topic)
m.Subscriber.OnClose(newTopic)
}
......@@ -9,13 +9,13 @@ import (
"github.com/ipfs/go-graphsync/testutil"
)
func TestSubscribeOn(t *testing.T) {
func TestSubscribeWithData(t *testing.T) {
ctx := context.Background()
testCases := map[string]func(ctx context.Context, t *testing.T, ps notifications.Publisher){
"SubscribeOn": func(ctx context.Context, t *testing.T, ps notifications.Publisher) {
"SubscribeWithData": func(ctx context.Context, t *testing.T, ps notifications.Publisher) {
destTopic := "t2"
notifee, verifier := testutil.NewTestNotifee(destTopic, 1)
notifications.SubscribeOn(ps, "t1", notifee)
notifications.SubscribeWithData(ps, "t1", notifee)
ps.Publish("t1", "hi")
ps.Shutdown()
verifier.ExpectEvents(ctx, t, []notifications.Event{"hi"})
......
......@@ -6,19 +6,15 @@ type Topic interface{}
// Event is a publishable event
type Event interface{}
// TopicData is data added to every message broadcast on a topic
type TopicData interface{}
// Subscriber is a subscriber that can receive events
type Subscriber interface {
OnNext(Topic, Event)
OnClose(Topic)
}
// MappableSubscriber is a subscriber that remaps events received to other topics
// and events
type MappableSubscriber interface {
Subscriber
Map(sourceID Topic, destinationID Topic)
}
// Subscribable is a stream that can be subscribed to
type Subscribable interface {
Subscribe(topic Topic, sub Subscriber) bool
......@@ -37,20 +33,16 @@ type Publisher interface {
// EventTransform if a fucntion transforms one kind of event to another
type EventTransform func(Event) Event
// Notifee is a mappable suscriber where you want events to appear
// on this specified topic (used to call SubscribeOn to setup a remapping)
// Notifee is a topic data subscriber plus a set of data you want to add to any topics subscribed to
// (used to call SubscribeWithData to inject data when events for a given topic emit)
type Notifee struct {
Topic Topic
Subscriber MappableSubscriber
Data TopicData
Subscriber *TopicDataSubscriber
}
// SubscribeOn subscribes to the given subscribe on the given topic, but
// maps to a differnt topic specified in a notifee which has a mappable
// subscriber
func SubscribeOn(p Subscribable, topic Topic, notifee Notifee) {
notifee.Subscriber.Map(topic, notifee.Topic)
// SubscribeWithData subscribes to the given subscriber on the given topic, and adds the notifies
// custom data into the list of data injected into callbacks when events occur on that topic
func SubscribeWithData(p Subscribable, topic Topic, notifee Notifee) {
notifee.Subscriber.AddTopicData(topic, notifee.Data)
p.Subscribe(topic, notifee.Subscriber)
}
// IdentityTransform sets up an event transform that makes no changes
func IdentityTransform(ev Event) Event { return ev }
......@@ -75,8 +75,8 @@ type peerResponseSender struct {
responseBuilders []*responsebuilder.ResponseBuilder
nextBuilderTopic responsebuilder.Topic
queuedMessages chan responsebuilder.Topic
subscriber notifications.MappableSubscriber
allocatorSubscriber notifications.MappableSubscriber
subscriber *notifications.TopicDataSubscriber
allocatorSubscriber *notifications.TopicDataSubscriber
publisher notifications.Publisher
}
......@@ -133,8 +133,8 @@ func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHa
publisher: notifications.NewPublisher(),
allocator: allocator,
}
prs.subscriber = notifications.NewMappableSubscriber(&subscriber{prs}, notifications.IdentityTransform)
prs.allocatorSubscriber = notifications.NewMappableSubscriber(&allocatorSubscriber{prs}, notifications.IdentityTransform)
prs.subscriber = notifications.NewTopicDataSubscriber(&subscriber{prs})
prs.allocatorSubscriber = notifications.NewTopicDataSubscriber(&allocatorSubscriber{prs})
return prs
}
......@@ -418,7 +418,7 @@ func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn fun
responseBuilder := prs.responseBuilders[len(prs.responseBuilders)-1]
buildResponseFn(responseBuilder)
for _, notifee := range notifees {
notifications.SubscribeOn(prs.publisher, responseBuilder.Topic(), notifee)
notifications.SubscribeWithData(prs.publisher, responseBuilder.Topic(), notifee)
}
return !responseBuilder.Empty()
}
......@@ -466,8 +466,8 @@ func (prs *peerResponseSender) sendResponseMessages() {
if builder.Empty() {
continue
}
notifications.SubscribeOn(prs.publisher, builder.Topic(), notifications.Notifee{
Topic: builder.BlockSize(),
notifications.SubscribeWithData(prs.publisher, builder.Topic(), notifications.Notifee{
Data: builder.BlockSize(),
Subscriber: prs.allocatorSubscriber,
})
responses, blks, err := builder.Build()
......@@ -476,7 +476,7 @@ func (prs *peerResponseSender) sendResponseMessages() {
}
prs.peerHandler.SendResponse(prs.p, responses, blks, notifications.Notifee{
Topic: builder.Topic(),
Data: builder.Topic(),
Subscriber: prs.subscriber,
})
......
......@@ -112,12 +112,12 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData)
func (qe *queryExecutor) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest, signals signals, sub notifications.MappableSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
result := qe.requestHooks.ProcessRequestHooks(p, request)
peerResponseSender := qe.peerManager.SenderForPeer(p)
var transactionError error
var isPaused bool
failNotifee := notifications.Notifee{Topic: graphsync.RequestFailedUnknown, Subscriber: sub}
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
......@@ -199,7 +199,7 @@ func (qe *queryExecutor) executeQuery(
loader ipld.Loader,
traverser ipldutil.Traverser,
signals signals,
sub notifications.MappableSubscriber) (graphsync.ResponseStatusCode, error) {
sub *notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) {
updateChan := make(chan []gsmsg.GraphSyncRequest)
peerResponseSender := qe.peerManager.SenderForPeer(p)
err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
......@@ -210,7 +210,7 @@ func (qe *queryExecutor) executeQuery(
return nil
}
blockData := transaction.SendResponse(link, data)
transaction.AddNotifee(notifications.Notifee{Topic: blockData, Subscriber: sub})
transaction.AddNotifee(notifications.Notifee{Data: blockData, Subscriber: sub})
if blockData.BlockSize() > 0 {
result := qe.blockHooks.ProcessBlockHooks(p, request, blockData)
for _, extension := range result.Extensions {
......@@ -253,7 +253,7 @@ func (qe *queryExecutor) executeQuery(
} else {
code = peerResponseSender.FinishRequest()
}
peerResponseSender.AddNotifee(notifications.Notifee{Topic: code, Subscriber: sub})
peerResponseSender.AddNotifee(notifications.Notifee{Data: code, Subscriber: sub})
return nil
})
return code, err
......
......@@ -34,7 +34,7 @@ type inProgressResponseStatus struct {
signals signals
updates []gsmsg.GraphSyncRequest
isPaused bool
subscriber notifications.MappableSubscriber
subscriber *notifications.TopicDataSubscriber
}
type responseKey struct {
......@@ -50,7 +50,7 @@ type signals struct {
type responseTaskData struct {
empty bool
subscriber notifications.MappableSubscriber
subscriber *notifications.TopicDataSubscriber
ctx context.Context
request gsmsg.GraphSyncRequest
loader ipld.Loader
......@@ -332,7 +332,7 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync
}
if result.Err != nil {
transaction.FinishWithError(graphsync.RequestFailedUnknown)
transaction.AddNotifee(notifications.Notifee{Topic: graphsync.RequestFailedUnknown, Subscriber: response.subscriber})
transaction.AddNotifee(notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: response.subscriber})
}
return nil
})
......@@ -395,7 +395,7 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID
rm.cancelledListeners.NotifyCancelledListeners(p, response.request)
peerResponseSender.FinishWithCancel(requestID)
} else if err != errNetworkError {
peerResponseSender.FinishWithError(requestID, graphsync.RequestCancelled, notifications.Notifee{Topic: graphsync.RequestCancelled, Subscriber: response.subscriber})
peerResponseSender.FinishWithError(requestID, graphsync.RequestCancelled, notifications.Notifee{Data: graphsync.RequestCancelled, Subscriber: response.subscriber})
}
delete(rm.inProgressResponses, key)
response.cancelFn()
......@@ -420,7 +420,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) {
continue
}
ctx, cancelFn := context.WithCancel(rm.ctx)
sub := notifications.NewMappableSubscriber(&subscriber{
sub := notifications.NewTopicDataSubscriber(&subscriber{
p: key.p,
request: request,
ctx: rm.ctx,
......@@ -428,7 +428,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) {
blockSentListeners: rm.blockSentListeners,
completedListeners: rm.completedListeners,
networkErrorListeners: rm.networkErrorListeners,
}, notifications.IdentityTransform)
})
rm.inProgressResponses[key] =
&inProgressResponseStatus{
ctx: ctx,
......
......@@ -1103,29 +1103,29 @@ func (td *testData) assertIgnoredCids(set *cid.Set) {
}
func (td *testData) notifyStatusMessagesSent() {
td.notifeePublisher.PublishMatchingEvents(func(topic notifications.Topic) bool {
_, isSn := topic.(graphsync.ResponseStatusCode)
td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool {
_, isSn := data.(graphsync.ResponseStatusCode)
return isSn
}, []notifications.Event{peerresponsemanager.Event{Name: peerresponsemanager.Sent}})
}
func (td *testData) notifyBlockSendsSent() {
td.notifeePublisher.PublishMatchingEvents(func(topic notifications.Topic) bool {
_, isBsn := topic.(graphsync.BlockData)
td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool {
_, isBsn := data.(graphsync.BlockData)
return isBsn
}, []notifications.Event{peerresponsemanager.Event{Name: peerresponsemanager.Sent}})
}
func (td *testData) notifyStatusMessagesNetworkError(err error) {
td.notifeePublisher.PublishMatchingEvents(func(topic notifications.Topic) bool {
_, isSn := topic.(graphsync.ResponseStatusCode)
td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool {
_, isSn := data.(graphsync.ResponseStatusCode)
return isSn
}, []notifications.Event{peerresponsemanager.Event{Name: peerresponsemanager.Error, Err: err}})
}
func (td *testData) notifyBlockSendsNetworkError(err error) {
td.notifeePublisher.PublishMatchingEvents(func(topic notifications.Topic) bool {
_, isBsn := topic.(graphsync.BlockData)
td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool {
_, isBsn := data.(graphsync.BlockData)
return isBsn
}, []notifications.Event{peerresponsemanager.Event{Name: peerresponsemanager.Error, Err: err}})
}
......
......@@ -85,13 +85,13 @@ func (nv *NotifeeVerifier) ExpectClose(ctx context.Context, t *testing.T) {
nv.subscriber.ExpectCloses(ctx, t, []notifications.Topic{nv.expectedTopic})
}
func NewTestNotifee(topic notifications.Topic, bufferSize int) (notifications.Notifee, *NotifeeVerifier) {
func NewTestNotifee(data notifications.TopicData, bufferSize int) (notifications.Notifee, *NotifeeVerifier) {
subscriber := NewTestSubscriber(bufferSize)
return notifications.Notifee{
Topic: topic,
Subscriber: notifications.NewMappableSubscriber(subscriber, notifications.IdentityTransform),
Data: data,
Subscriber: notifications.NewTopicDataSubscriber(subscriber),
}, &NotifeeVerifier{
expectedTopic: topic,
expectedTopic: data,
subscriber: subscriber,
}
}
......@@ -107,15 +107,15 @@ func (mp *MockPublisher) AddNotifees(notifees []notifications.Notifee) {
mp.notifeesLk.Unlock()
}
func (mp *MockPublisher) PublishMatchingEvents(shouldPublish func(notifications.Topic) bool, events []notifications.Event) {
func (mp *MockPublisher) PublishMatchingEvents(shouldPublish func(notifications.TopicData) bool, events []notifications.Event) {
mp.notifeesLk.Lock()
var newNotifees []notifications.Notifee
for _, notifee := range mp.notifees {
if shouldPublish(notifee.Topic) {
if shouldPublish(notifee.Data) {
for _, ev := range events {
notifee.Subscriber.OnNext(notifee.Topic, ev)
notifee.Subscriber.Subscriber.OnNext(notifee.Data, ev)
}
notifee.Subscriber.OnClose(notifee.Topic)
notifee.Subscriber.Subscriber.OnClose(notifee.Data)
} else {
newNotifees = append(newNotifees, notifee)
}
......@@ -125,13 +125,13 @@ func (mp *MockPublisher) PublishMatchingEvents(shouldPublish func(notifications.
}
func (mp *MockPublisher) PublishEvents(events []notifications.Event) {
mp.PublishMatchingEvents(func(notifications.Topic) bool { return true }, events)
mp.PublishMatchingEvents(func(notifications.TopicData) bool { return true }, events)
}
func (mp *MockPublisher) PublishEventsOnTopics(topics []notifications.Topic, events []notifications.Event) {
shouldPublish := func(topic notifications.Topic) bool {
for _, testTopic := range topics {
if topic == testTopic {
func (mp *MockPublisher) PublishEventsOnTopicData(data []notifications.TopicData, events []notifications.Event) {
shouldPublish := func(topic notifications.TopicData) bool {
for _, testTopicData := range data {
if topic == testTopicData {
return true
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment