responseemitter.go 7.47 KB
Newer Older
1 2 3 4 5 6 7
package http

import (
	"fmt"
	"io"
	"net/http"
	"strconv"
8
	"strings"
Jan Winkelmann's avatar
Jan Winkelmann committed
9
	"sync"
10

Hector Sanjuan's avatar
Hector Sanjuan committed
11
	cmds "github.com/ipfs/go-ipfs-cmds"
12 13 14
)

var (
Hector Sanjuan's avatar
Hector Sanjuan committed
15
	// AllowedExposedHeadersArr defines the default Access-Control-Expose-Headers.
16
	AllowedExposedHeadersArr = []string{streamHeader, channelHeader, extraContentLengthHeader}
Hector Sanjuan's avatar
Hector Sanjuan committed
17 18
	// AllowedExposedHeaders is the list of defaults Access-Control-Expose-Headers separated by comma.
	AllowedExposedHeaders = strings.Join(AllowedExposedHeadersArr, ", ")
19 20 21 22 23 24 25

	mimeTypes = map[cmds.EncodingType]string{
		cmds.Protobuf: "application/protobuf",
		cmds.JSON:     "application/json",
		cmds.XML:      "application/xml",
		cmds.Text:     "text/plain",
	}
26 27
)

Hector Sanjuan's avatar
Hector Sanjuan committed
28
// NewResponseEmitter returns a new ResponseEmitter.
29
func NewResponseEmitter(w http.ResponseWriter, method string, req *cmds.Request, opts ...ResponseEmitterOption) (ResponseEmitter, error) {
Steven Allen's avatar
Steven Allen committed
30
	encType, enc, err := cmds.GetEncoder(req, w, cmds.JSON)
31 32
	if err != nil {
		return nil, err
33
	}
34
	re := &responseEmitter{
35 36
		w:       w,
		encType: encType,
Steven Allen's avatar
Steven Allen committed
37
		enc:     enc,
38 39
		method:  method,
		req:     req,
40
	}
41

42
	// apply functional options
43 44 45 46
	for _, opt := range opts {
		opt(re)
	}

47
	return re, nil
48 49
}

50
// ResponseEmitterOption is the type describing options to the NewResponseEmitter function.
51 52
type ResponseEmitterOption func(*responseEmitter)

53
// withRequestBodyEOFChan return a ResponseEmitterOption needed to gracefully handle
54 55
// the case where the handler wants to send data and the request data has not been read
// completely yet.
56
func withRequestBodyEOFChan(ch <-chan struct{}) ResponseEmitterOption {
57 58
	return func(re *responseEmitter) {
		if ch != nil {
59
			re.bodyEOFChan = ch
60 61 62 63
		}
	}
}

Hector Sanjuan's avatar
Hector Sanjuan committed
64 65
// ResponseEmitter interface defines the components that can care of sending
// the response to HTTP Requests.
66 67 68 69 70 71
type ResponseEmitter interface {
	cmds.ResponseEmitter
	http.Flusher
}

type responseEmitter struct {
Jan Winkelmann's avatar
Jan Winkelmann committed
72 73
	w http.ResponseWriter

Steven Allen's avatar
Steven Allen committed
74
	enc     cmds.Encoder
Jan Winkelmann's avatar
Jan Winkelmann committed
75
	encType cmds.EncodingType
76
	req     *cmds.Request
Jan Winkelmann's avatar
Jan Winkelmann committed
77

keks's avatar
keks committed
78
	l      sync.Mutex
Jan Winkelmann's avatar
Jan Winkelmann committed
79 80
	length uint64

81
	bodyEOFChan <-chan struct{}
82

83
	streaming bool
keks's avatar
keks committed
84
	closed    bool
85 86
	once      sync.Once
	method    string
87 88
}

89
func (re *responseEmitter) Emit(value interface{}) error {
keks's avatar
keks committed
90 91 92 93 94 95
	// if we got a channel, instead emit values received on there.
	if ch, ok := value.(chan interface{}); ok {
		value = (<-chan interface{})(ch)
	}
	if ch, isChan := value.(<-chan interface{}); isChan {
		return cmds.EmitChan(re, ch)
96 97
	}

98
	re.once.Do(func() { re.preamble(value) })
Jan Winkelmann's avatar
Jan Winkelmann committed
99

keks's avatar
keks committed
100 101
	re.l.Lock()
	defer re.l.Unlock()
102

keks's avatar
keks committed
103 104 105 106
	if re.closed {
		return cmds.ErrClosedEmitter
	}

keks's avatar
keks committed
107 108
	var err error

109
	// return immediately if this is a head request
110
	if re.method == http.MethodHead {
111 112 113
		return nil
	}

114 115 116 117 118
	// ignore those
	if value == nil {
		return nil
	}

keks's avatar
keks committed
119 120 121 122 123 124
	var isSingle bool
	if single, ok := value.(cmds.Single); ok {
		value = single.Value
		isSingle = true
	}

125 126 127 128
	if f, ok := re.w.(http.Flusher); ok {
		defer f.Flush()
	}

129
	switch v := value.(type) {
130 131
	case error:
		return re.closeWithError(v)
132
	case io.Reader:
133
		err = flushCopy(re.w, v)
134
	default:
Steven Allen's avatar
Steven Allen committed
135
		err = re.enc.Encode(value)
136 137
	}

keks's avatar
keks committed
138 139
	if isSingle && err == nil {
		// only close when there were no encoding errors
140
		err = re.closeWithError(nil)
keks's avatar
keks committed
141 142
	}

143 144 145
	return err
}

146
func (re *responseEmitter) SetLength(l uint64) {
keks's avatar
keks committed
147 148 149
	re.l.Lock()
	defer re.l.Unlock()

Jan Winkelmann's avatar
Jan Winkelmann committed
150 151 152
	h := re.w.Header()
	h.Set("X-Content-Length", strconv.FormatUint(l, 10))

153 154 155
	re.length = l
}

156
func (re *responseEmitter) Close() error {
157
	return re.CloseWithError(nil)
158 159
}

keks's avatar
keks committed
160 161 162 163
func (re *responseEmitter) CloseWithError(err error) error {
	re.l.Lock()
	defer re.l.Unlock()

164 165
	return re.closeWithError(err)
}
keks's avatar
keks committed
166

167
func (re *responseEmitter) closeWithError(err error) error {
168 169 170
	if re.closed {
		return cmds.ErrClosingClosedEmitter
	}
171

Steven Allen's avatar
Steven Allen committed
172 173 174 175 176
	switch err {
	case nil:
		// no error
	case io.EOF:
		// not a real error
keks's avatar
keks committed
177
		err = nil
Steven Allen's avatar
Steven Allen committed
178
	default:
Steven Allen's avatar
Steven Allen committed
179
		// make sure this is *always* of type *cmds.Error
Steven Allen's avatar
Steven Allen committed
180
		switch e := err.(type) {
Steven Allen's avatar
Steven Allen committed
181
		case cmds.Error:
Steven Allen's avatar
Steven Allen committed
182
			err = &e
Steven Allen's avatar
Steven Allen committed
183
		case *cmds.Error:
Steven Allen's avatar
Steven Allen committed
184 185
		case nil:
		default:
Steven Allen's avatar
Steven Allen committed
186
			err = &cmds.Error{Message: err.Error(), Code: cmds.ErrNormal}
Steven Allen's avatar
Steven Allen committed
187
		}
keks's avatar
keks committed
188 189
	}

190 191
	setErrTrailer := true

keks's avatar
keks committed
192
	// use preamble directly, we're already in critical section
193
	// preamble needs to be before branch below, because the headers need to be written before writing the response
194 195
	re.once.Do(func() {
		re.doPreamble(err)
keks's avatar
keks committed
196

197 198 199
		// do not set error trailer if we send the error as value in preamble
		setErrTrailer = false
	})
keks's avatar
keks committed
200

201 202
	if setErrTrailer && err != nil {
		re.w.Header().Set(StreamErrHeader, err.Error())
203 204
	}

keks's avatar
keks committed
205 206
	re.closed = true

207
	return nil
208 209
}

Jan Winkelmann's avatar
Jan Winkelmann committed
210
// Flush the http connection
211
func (re *responseEmitter) Flush() {
212
	re.once.Do(func() { re.preamble(nil) })
keks's avatar
keks committed
213

214 215
	if flusher, ok := re.w.(http.Flusher); ok {
		flusher.Flush()
keks's avatar
keks committed
216
	}
217 218
}

219
func (re *responseEmitter) preamble(value interface{}) {
keks's avatar
keks committed
220 221 222
	re.l.Lock()
	defer re.l.Unlock()

223 224 225
	re.doPreamble(value)
}

Steven Allen's avatar
Steven Allen committed
226
func (re *responseEmitter) sendErr(err *cmds.Error) {
Steven Allen's avatar
Steven Allen committed
227 228 229 230 231 232 233 234 235 236 237 238 239 240
	// Handle error encoding. *Try* to obey the requested encoding, fallback
	// on json.
	encType := re.encType
	enc, ok := cmds.Encoders[encType]
	if !ok {
		encType = cmds.JSON
		enc = cmds.Encoders[encType]
	}

	// Set the appropriate MIME Type
	re.w.Header().Set(contentTypeHeader, mimeTypes[encType])

	// Set the status from the error code.
	status := http.StatusInternalServerError
Steven Allen's avatar
Steven Allen committed
241
	if err.Code == cmds.ErrClient {
Steven Allen's avatar
Steven Allen committed
242 243 244 245 246 247 248 249 250 251 252 253
		status = http.StatusBadRequest
	}
	re.w.WriteHeader(status)

	// Finally, send the errr
	if err := enc(re.req)(re.w).Encode(err); err != nil {
		log.Error("error sending error value after non-200 response", err)
	}

	re.closed = true
}

254
func (re *responseEmitter) doPreamble(value interface{}) {
255
	var (
Steven Allen's avatar
Steven Allen committed
256 257
		h    = re.w.Header()
		mime string
258
	)
259

Steven Allen's avatar
Steven Allen committed
260 261 262 263 264 265 266 267 268 269
	// Common Headers

	// set 'allowed' headers
	h.Set("Access-Control-Allow-Headers", AllowedExposedHeaders)
	// expose those headers
	h.Set("Access-Control-Expose-Headers", AllowedExposedHeaders)

	// Set up our potential trailer
	h.Set("Trailer", StreamErrHeader)

270 271 272 273
	// If we have a request body, make sure we close the body
	// if we want to write before completing reading.
	// FIXME: https://github.com/ipfs/go-ipfs/issues/5168
	// FIXME: https://github.com/golang/go/issues/15527
274
	if re.bodyEOFChan != nil {
275
		select {
276 277
		case <-re.bodyEOFChan:
			// all good, we received an EOF, so the body is read completely.
278 279 280 281 282 283 284 285
			// we handle the error where it occurs, here we just want to know that we're done
		default:
			// set connection close header, because we want to write
			// even though the body is not yet read completely.
			h.Set("Connection", "close")
		}
	}

286
	switch v := value.(type) {
Steven Allen's avatar
Steven Allen committed
287
	case *cmds.Error:
Steven Allen's avatar
Steven Allen committed
288 289
		re.sendErr(v)
		return
Jan Winkelmann's avatar
Jan Winkelmann committed
290
	case io.Reader:
291 292 293
		// set streams output type to text to avoid issues with browsers rendering
		// html pages on priveleged api ports
		h.Set(streamHeader, "1")
keks's avatar
keks committed
294
		re.streaming = true
295 296 297

		mime = "text/plain"
	case cmds.Single:
298
		// don't set stream/channel header
Jan Winkelmann's avatar
Jan Winkelmann committed
299
	default:
300 301
		h.Set(channelHeader, "1")
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
302

303
	if mime == "" {
304 305 306 307 308 309 310 311
		var ok bool

		// lookup mime type from map
		mime, ok = mimeTypes[re.encType]
		if !ok {
			// catch-all, set to text as default
			mime = "text/plain"
		}
312 313 314 315
	}

	h.Set(contentTypeHeader, mime)

Steven Allen's avatar
Steven Allen committed
316
	re.w.WriteHeader(http.StatusOK)
317
}
Jan Winkelmann's avatar
Jan Winkelmann committed
318

319 320 321 322 323 324
func flushCopy(w io.Writer, r io.Reader) error {
	buf := make([]byte, 4096)
	f, ok := w.(http.Flusher)
	if !ok {
		_, err := io.Copy(w, r)
		return err
Jan Winkelmann's avatar
Jan Winkelmann committed
325
	}
326 327 328 329 330 331 332 333 334 335 336 337 338 339
	for {
		n, err := r.Read(buf)
		switch err {
		case io.EOF:
			if n <= 0 {
				return nil
			}
			// if data was returned alongside the EOF, pretend we didnt
			// get an EOF. The next read call should also EOF.
		case nil:
			// continue
		default:
			return err
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
340

341 342 343 344
		nw, err := w.Write(buf[:n])
		if err != nil {
			return err
		}
345

346 347 348 349 350 351
		if nw != n {
			return fmt.Errorf("http write failed to write full amount: %d != %d", nw, n)
		}

		f.Flush()
	}
352
}