responsecollector.go 1.95 KB
Newer Older
1 2
package requestmanager

3 4 5
import (
	"context"
)
6 7 8 9 10 11 12 13 14 15 16

type responseCollector struct {
	ctx context.Context
}

func newResponseCollector(ctx context.Context) *responseCollector {
	return &responseCollector{ctx}
}

func (rc *responseCollector) collectResponses(
	requestCtx context.Context,
17
	incomingResponses <-chan ResponseProgress,
18 19
	incomingErrors <-chan error,
	cancelRequest func()) (<-chan ResponseProgress, <-chan error) {
20 21

	returnedResponses := make(chan ResponseProgress)
22
	returnedErrors := make(chan error)
23 24

	go func() {
25
		var receivedResponses []ResponseProgress
26
		var receivedErrors []error
27
		defer close(returnedResponses)
28
		defer close(returnedErrors)
29 30 31 32 33 34 35 36
		outgoingResponses := func() chan<- ResponseProgress {
			if len(receivedResponses) == 0 {
				return nil
			}
			return returnedResponses
		}
		nextResponse := func() ResponseProgress {
			if len(receivedResponses) == 0 {
37
				return ResponseProgress{}
38 39 40
			}
			return receivedResponses[0]
		}
41
		outgoingErrors := func() chan<- error {
42 43 44 45 46
			if len(receivedErrors) == 0 {
				return nil
			}
			return returnedErrors
		}
47
		nextError := func() error {
48
			if len(receivedErrors) == 0 {
49
				return nil
50 51 52 53 54
			}
			return receivedErrors[0]
		}

		for len(receivedResponses) > 0 || len(receivedErrors) > 0 || incomingResponses != nil || incomingErrors != nil {
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
			select {
			case <-rc.ctx.Done():
				return
			case <-requestCtx.Done():
				if incomingResponses != nil {
					cancelRequest()
				}
				return
			case response, ok := <-incomingResponses:
				if !ok {
					incomingResponses = nil
				} else {
					receivedResponses = append(receivedResponses, response)
				}
			case outgoingResponses() <- nextResponse():
				receivedResponses = receivedResponses[1:]
hannahhoward's avatar
hannahhoward committed
71
			case err, ok := <-incomingErrors:
72 73 74
				if !ok {
					incomingErrors = nil
				} else {
hannahhoward's avatar
hannahhoward committed
75
					receivedErrors = append(receivedErrors, err)
76 77 78
				}
			case outgoingErrors() <- nextError():
				receivedErrors = receivedErrors[1:]
79 80 81
			}
		}
	}()
82
	return returnedResponses, returnedErrors
83
}