Unverified Commit 1bdc5585 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

fix(responsemanager): fix network error propogation (#133)

fix various issues causing network errors not to propogate in many cases
parent 42f195e9
......@@ -118,6 +118,14 @@ func (mq *MessageQueue) runQueue() {
case <-mq.outgoingWork:
mq.sendMessage()
case <-mq.done:
select {
case <-mq.outgoingWork:
message, topic := mq.extractOutgoingMessage()
if message != nil || !message.Empty() {
mq.eventPublisher.Publish(topic, Event{Name: Error, Err: fmt.Errorf("message queue shutdown")})
}
default:
}
if mq.sender != nil {
mq.sender.Close()
}
......
package notifications
import "sync"
import (
"sync"
)
type TopicDataSubscriber struct {
idMapLk sync.RWMutex
......@@ -48,4 +50,7 @@ func (m *TopicDataSubscriber) OnClose(topic Topic) {
for _, data := range m.getData(topic) {
m.Subscriber.OnClose(data)
}
m.idMapLk.Lock()
delete(m.data, topic)
m.idMapLk.Unlock()
}
......@@ -83,8 +83,18 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
// GetProcess returns the process for the given peer
func (pm *PeerManager) GetProcess(
p peer.ID) PeerProcess {
// Usually this this is just a read
pm.peerProcessesLk.RLock()
pqi, ok := pm.peerProcesses[p]
if ok {
pm.peerProcessesLk.RUnlock()
return pqi.process
}
pm.peerProcessesLk.RUnlock()
// but sometimes it involves a create (we still need to do get or create cause it's possible
// another writer grabbed the Lock first and made the process)
pm.peerProcessesLk.Lock()
pqi := pm.getOrCreate(p)
pqi = pm.getOrCreate(p)
pm.peerProcessesLk.Unlock()
return pqi.process
}
......
......@@ -78,6 +78,7 @@ type peerResponseSender struct {
subscriber *notifications.TopicDataSubscriber
allocatorSubscriber *notifications.TopicDataSubscriber
publisher notifications.Publisher
messagesSending sync.WaitGroup
}
// PeerResponseSender handles batching, deduping, and sending responses for
......@@ -442,6 +443,7 @@ func (prs *peerResponseSender) signalWork() {
func (prs *peerResponseSender) run() {
defer func() {
prs.messagesSending.Wait()
prs.publisher.Shutdown()
prs.allocator.ReleasePeerMemory(prs.p)
}()
......@@ -449,6 +451,17 @@ func (prs *peerResponseSender) run() {
for {
select {
case <-prs.ctx.Done():
select {
case <-prs.outgoingWork:
prs.responseBuildersLk.Lock()
builders := prs.responseBuilders
prs.responseBuilders = nil
prs.responseBuildersLk.Unlock()
for _, builder := range builders {
prs.publisher.Publish(builder.Topic(), Event{Name: Error, Err: fmt.Errorf("queue shutdown")})
}
default:
}
return
case <-prs.outgoingWork:
prs.sendResponseMessages()
......@@ -475,6 +488,7 @@ func (prs *peerResponseSender) sendResponseMessages() {
log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
}
prs.messagesSending.Add(1)
prs.peerHandler.SendResponse(prs.p, responses, blks, notifications.Notifee{
Data: builder.Topic(),
Subscriber: prs.subscriber,
......@@ -514,8 +528,10 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event
switch msgEvent.Name {
case messagequeue.Sent:
s.prs.publisher.Publish(builderTopic, Event{Name: Sent})
s.prs.messagesSending.Done()
case messagequeue.Error:
s.prs.publisher.Publish(builderTopic, Event{Name: Error, Err: fmt.Errorf("error sending message: %w", msgEvent.Err)})
s.prs.messagesSending.Done()
case messagequeue.Queued:
select {
case s.prs.queuedMessages <- builderTopic:
......
......@@ -114,11 +114,10 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
result := qe.requestHooks.ProcessRequestHooks(p, request)
peerResponseSender := qe.peerManager.SenderForPeer(p)
var transactionError error
var isPaused bool
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
err := qe.peerManager.SenderForPeer(p).Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
}
......@@ -138,10 +137,10 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context,
if transactionError != nil {
return nil, nil, false, transactionError
}
if err := qe.processDedupByKey(request, peerResponseSender, failNotifee); err != nil {
if err := qe.processDedupByKey(request, qe.peerManager.SenderForPeer(p), failNotifee); err != nil {
return nil, nil, false, err
}
if err := qe.processDoNoSendCids(request, peerResponseSender, failNotifee); err != nil {
if err := qe.processDoNoSendCids(request, qe.peerManager.SenderForPeer(p), failNotifee); err != nil {
return nil, nil, false, err
}
rootLink := cidlink.Link{Cid: request.Root()}
......@@ -201,10 +200,9 @@ func (qe *queryExecutor) executeQuery(
signals signals,
sub *notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) {
updateChan := make(chan []gsmsg.GraphSyncRequest)
peerResponseSender := qe.peerManager.SenderForPeer(p)
err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
var err error
_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
_ = qe.peerManager.SenderForPeer(p).Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
err = qe.checkForUpdates(p, request, signals, updateChan, transaction)
if _, ok := err.(hooks.ErrPaused); !ok && err != nil {
return nil
......@@ -228,7 +226,7 @@ func (qe *queryExecutor) executeQuery(
return err
})
var code graphsync.ResponseStatusCode
_ = peerResponseSender.Transaction(request.ID(), func(peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error {
_ = qe.peerManager.SenderForPeer(p).Transaction(request.ID(), func(peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error {
if err != nil {
_, isPaused := err.(hooks.ErrPaused)
if isPaused {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment