responsecollector.go 2.07 KB
Newer Older
1 2
package requestmanager

3 4 5
import (
	"context"
)
6 7 8 9 10 11 12 13 14 15 16

type responseCollector struct {
	ctx context.Context
}

func newResponseCollector(ctx context.Context) *responseCollector {
	return &responseCollector{ctx}
}

func (rc *responseCollector) collectResponses(
	requestCtx context.Context,
17
	incomingResponses <-chan ResponseProgress,
18 19
	incomingErrors <-chan error,
	cancelRequest func()) (<-chan ResponseProgress, <-chan error) {
20 21

	returnedResponses := make(chan ResponseProgress)
22
	returnedErrors := make(chan error)
23 24

	go func() {
25
		var receivedResponses []ResponseProgress
26 27 28 29 30 31 32 33 34
		defer close(returnedResponses)
		outgoingResponses := func() chan<- ResponseProgress {
			if len(receivedResponses) == 0 {
				return nil
			}
			return returnedResponses
		}
		nextResponse := func() ResponseProgress {
			if len(receivedResponses) == 0 {
35
				return ResponseProgress{}
36 37 38
			}
			return receivedResponses[0]
		}
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
		for len(receivedResponses) > 0 || incomingResponses != nil {
			select {
			case <-rc.ctx.Done():
				return
			case <-requestCtx.Done():
				if incomingResponses != nil {
					cancelRequest()
				}
				return
			case response, ok := <-incomingResponses:
				if !ok {
					incomingResponses = nil
				} else {
					receivedResponses = append(receivedResponses, response)
				}
			case outgoingResponses() <- nextResponse():
				receivedResponses = receivedResponses[1:]
			}
		}
	}()
	go func() {
		var receivedErrors []error
		defer close(returnedErrors)

63
		outgoingErrors := func() chan<- error {
64 65 66 67 68
			if len(receivedErrors) == 0 {
				return nil
			}
			return returnedErrors
		}
69
		nextError := func() error {
70
			if len(receivedErrors) == 0 {
71
				return nil
72 73 74 75
			}
			return receivedErrors[0]
		}

76
		for len(receivedErrors) > 0 || incomingErrors != nil {
77 78 79 80 81
			select {
			case <-rc.ctx.Done():
				return
			case <-requestCtx.Done():
				return
hannahhoward's avatar
hannahhoward committed
82
			case err, ok := <-incomingErrors:
83 84 85
				if !ok {
					incomingErrors = nil
				} else {
hannahhoward's avatar
hannahhoward committed
86
					receivedErrors = append(receivedErrors, err)
87 88 89
				}
			case outgoingErrors() <- nextError():
				receivedErrors = receivedErrors[1:]
90 91 92
			}
		}
	}()
93
	return returnedResponses, returnedErrors
94
}