responsecollector.go 2.63 KB
Newer Older
1 2
package requestmanager

3 4
import (
	"context"
5

6
	"github.com/ipfs/go-graphsync"
7
)
8 9 10 11 12 13 14 15 16 17 18

type responseCollector struct {
	ctx context.Context
}

func newResponseCollector(ctx context.Context) *responseCollector {
	return &responseCollector{ctx}
}

func (rc *responseCollector) collectResponses(
	requestCtx context.Context,
19
	incomingResponses <-chan graphsync.ResponseProgress,
20
	incomingErrors <-chan error,
21
	cancelRequest func()) (<-chan graphsync.ResponseProgress, <-chan error) {
22

23
	returnedResponses := make(chan graphsync.ResponseProgress)
24
	returnedErrors := make(chan error)
25 26

	go func() {
27
		var receivedResponses []graphsync.ResponseProgress
28
		defer close(returnedResponses)
29
		outgoingResponses := func() chan<- graphsync.ResponseProgress {
30 31 32 33 34
			if len(receivedResponses) == 0 {
				return nil
			}
			return returnedResponses
		}
35
		nextResponse := func() graphsync.ResponseProgress {
36
			if len(receivedResponses) == 0 {
37
				return graphsync.ResponseProgress{}
38 39 40
			}
			return receivedResponses[0]
		}
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
		for len(receivedResponses) > 0 || incomingResponses != nil {
			select {
			case <-rc.ctx.Done():
				return
			case <-requestCtx.Done():
				if incomingResponses != nil {
					cancelRequest()
				}
				return
			case response, ok := <-incomingResponses:
				if !ok {
					incomingResponses = nil
				} else {
					receivedResponses = append(receivedResponses, response)
				}
			case outgoingResponses() <- nextResponse():
				receivedResponses = receivedResponses[1:]
			}
		}
	}()
	go func() {
		var receivedErrors []error
		defer close(returnedErrors)

65
		outgoingErrors := func() chan<- error {
66 67 68 69 70
			if len(receivedErrors) == 0 {
				return nil
			}
			return returnedErrors
		}
71
		nextError := func() error {
72
			if len(receivedErrors) == 0 {
73
				return nil
74 75 76 77
			}
			return receivedErrors[0]
		}

78
		for len(receivedErrors) > 0 || incomingErrors != nil {
79 80 81 82
			select {
			case <-rc.ctx.Done():
				return
			case <-requestCtx.Done():
83 84 85 86
				select {
				case <-rc.ctx.Done():
				case returnedErrors <- graphsync.RequestContextCancelledErr{}:
				}
87
				return
hannahhoward's avatar
hannahhoward committed
88
			case err, ok := <-incomingErrors:
89 90
				if !ok {
					incomingErrors = nil
91 92 93 94 95 96 97 98 99 100
					// even if the `incomingErrors` channel is closed without any error,
					// the context could still have timed out in which case we need to inform the caller of the same.
					select {
					case <-requestCtx.Done():
						select {
						case <-rc.ctx.Done():
						case returnedErrors <- graphsync.RequestContextCancelledErr{}:
						}
					default:
					}
101
				} else {
hannahhoward's avatar
hannahhoward committed
102
					receivedErrors = append(receivedErrors, err)
103 104 105
				}
			case outgoingErrors() <- nextError():
				receivedErrors = receivedErrors[1:]
106 107 108
			}
		}
	}()
109
	return returnedResponses, returnedErrors
110
}