Commit ebe10ea5 authored by hannahhoward's avatar hannahhoward

fix(responsecollector): don't block channels

make sure order channels are consumed in repsonse collector doesn't block sends on the other
parent d1ef5e81
...@@ -658,10 +658,10 @@ func TestRequestReturnsMissingBlocks(t *testing.T) { ...@@ -658,10 +658,10 @@ func TestRequestReturnsMissingBlocks(t *testing.T) {
for _, block := range blocks { for _, block := range blocks {
fal.responseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")}) fal.responseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
} }
verifyEmptyResponse(ctx, t, returnedResponseChan)
errs := collectErrors(ctx, t, returnedErrorChan) errs := collectErrors(ctx, t, returnedErrorChan)
if len(errs) != len(blocks) { if len(errs) != len(blocks) {
t.Fatal("did not send all errors") t.Fatal("did not send all errors")
} }
verifyEmptyResponse(ctx, t, returnedResponseChan)
} }
...@@ -23,9 +23,7 @@ func (rc *responseCollector) collectResponses( ...@@ -23,9 +23,7 @@ func (rc *responseCollector) collectResponses(
go func() { go func() {
var receivedResponses []ResponseProgress var receivedResponses []ResponseProgress
var receivedErrors []error
defer close(returnedResponses) defer close(returnedResponses)
defer close(returnedErrors)
outgoingResponses := func() chan<- ResponseProgress { outgoingResponses := func() chan<- ResponseProgress {
if len(receivedResponses) == 0 { if len(receivedResponses) == 0 {
return nil return nil
...@@ -38,6 +36,30 @@ func (rc *responseCollector) collectResponses( ...@@ -38,6 +36,30 @@ func (rc *responseCollector) collectResponses(
} }
return receivedResponses[0] return receivedResponses[0]
} }
for len(receivedResponses) > 0 || incomingResponses != nil {
select {
case <-rc.ctx.Done():
return
case <-requestCtx.Done():
if incomingResponses != nil {
cancelRequest()
}
return
case response, ok := <-incomingResponses:
if !ok {
incomingResponses = nil
} else {
receivedResponses = append(receivedResponses, response)
}
case outgoingResponses() <- nextResponse():
receivedResponses = receivedResponses[1:]
}
}
}()
go func() {
var receivedErrors []error
defer close(returnedErrors)
outgoingErrors := func() chan<- error { outgoingErrors := func() chan<- error {
if len(receivedErrors) == 0 { if len(receivedErrors) == 0 {
return nil return nil
...@@ -51,23 +73,12 @@ func (rc *responseCollector) collectResponses( ...@@ -51,23 +73,12 @@ func (rc *responseCollector) collectResponses(
return receivedErrors[0] return receivedErrors[0]
} }
for len(receivedResponses) > 0 || len(receivedErrors) > 0 || incomingResponses != nil || incomingErrors != nil { for len(receivedErrors) > 0 || incomingErrors != nil {
select { select {
case <-rc.ctx.Done(): case <-rc.ctx.Done():
return return
case <-requestCtx.Done(): case <-requestCtx.Done():
if incomingResponses != nil {
cancelRequest()
}
return return
case response, ok := <-incomingResponses:
if !ok {
incomingResponses = nil
} else {
receivedResponses = append(receivedResponses, response)
}
case outgoingResponses() <- nextResponse():
receivedResponses = receivedResponses[1:]
case err, ok := <-incomingErrors: case err, ok := <-incomingErrors:
if !ok { if !ok {
incomingErrors = nil incomingErrors = nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment