responseemitter.go 5.65 KB
Newer Older
1 2 3 4 5 6 7
package http

import (
	"fmt"
	"io"
	"net/http"
	"strconv"
8
	"strings"
Jan Winkelmann's avatar
Jan Winkelmann committed
9
	"sync"
10

keks's avatar
keks committed
11
	"github.com/ipfs/go-ipfs-cmdkit"
keks's avatar
keks committed
12
	cmds "github.com/ipfs/go-ipfs-cmds"
13 14 15 16
)

var (
	HeadRequest = fmt.Errorf("HEAD request")
17 18 19 20 21 22 23 24 25 26

	AllowedExposedHeadersArr = []string{streamHeader, channelHeader, extraContentLengthHeader}
	AllowedExposedHeaders    = strings.Join(AllowedExposedHeadersArr, ", ")

	mimeTypes = map[cmds.EncodingType]string{
		cmds.Protobuf: "application/protobuf",
		cmds.JSON:     "application/json",
		cmds.XML:      "application/xml",
		cmds.Text:     "text/plain",
	}
27 28
)

keks's avatar
keks committed
29 30 31 32
type Doner interface {
	Done() <-chan struct{}
}

33
// NewResponeEmitter returns a new ResponseEmitter.
34
func NewResponseEmitter(w http.ResponseWriter, method string, req *cmds.Request) ResponseEmitter {
35
	encType := cmds.GetEncoding(req)
36

37
	var enc cmds.Encoder
38

39 40
	if _, ok := cmds.Encoders[encType]; ok {
		enc = cmds.Encoders[encType](req)(w)
41 42
	}

43
	re := &responseEmitter{
keks's avatar
keks committed
44 45 46 47 48 49
		w:         w,
		encType:   encType,
		enc:       enc,
		method:    method,
		req:       req,
		closeWait: make(chan struct{}),
50 51 52 53
	}
	return re
}

54 55 56 57 58 59
type ResponseEmitter interface {
	cmds.ResponseEmitter
	http.Flusher
}

type responseEmitter struct {
Jan Winkelmann's avatar
Jan Winkelmann committed
60 61 62 63
	w http.ResponseWriter

	enc     cmds.Encoder
	encType cmds.EncodingType
64
	req     *cmds.Request
Jan Winkelmann's avatar
Jan Winkelmann committed
65 66

	length uint64
67
	err    *cmdkit.Error
Jan Winkelmann's avatar
Jan Winkelmann committed
68

69 70 71
	streaming bool
	once      sync.Once
	method    string
72 73

	closeOnce sync.Once
keks's avatar
keks committed
74
	closeWait chan struct{}
75 76
}

77
func (re *responseEmitter) Emit(value interface{}) error {
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
	ch, isChan := value.(<-chan interface{})
	if !isChan {
		ch, isChan = value.(chan interface{})
	}

	if isChan {
		for value = range ch {
			err := re.Emit(value)
			if err != nil {
				return err
			}
		}
		return nil
	}

93 94
	var err error

95
	re.once.Do(func() { re.preamble(value) })
Jan Winkelmann's avatar
Jan Winkelmann committed
96

97 98 99 100 101
	// return immediately if this is a head request
	if re.method == "HEAD" {
		return nil
	}

102 103 104 105 106
	if single, ok := value.(cmds.Single); ok {
		value = single.Value
		defer re.Close()
	}

107 108 109
	if re.w == nil {
		return fmt.Errorf("connection already closed / custom - http.respem - TODO")
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
110

111 112 113 114 115
	// ignore those
	if value == nil {
		return nil
	}

116
	if _, ok := value.(cmdkit.Error); ok {
117 118 119 120
		log.Warning("fixme: got Error not *Error: ", value)
		value = &value
	}

121 122
	switch v := value.(type) {
	case io.Reader:
123
		err = flushCopy(re.w, v)
124 125
	case *cmdkit.Error:
		if re.streaming || v.Code == cmdkit.ErrFatal {
126 127 128 129 130
			// abort by sending an error trailer
			re.w.Header().Add(StreamErrHeader, v.Error())
		} else {
			err = re.enc.Encode(value)
		}
131 132
	default:
		err = re.enc.Encode(value)
133 134 135 136
	}

	if f, ok := re.w.(http.Flusher); ok {
		f.Flush()
137 138
	}

Jan Winkelmann's avatar
Jan Winkelmann committed
139 140 141 142
	if err != nil {
		log.Error(err)
	}

143 144 145
	return err
}

keks's avatar
keks committed
146 147 148 149
func (re *responseEmitter) Done() <-chan struct{} {
	return re.closeWait
}

150
func (re *responseEmitter) SetLength(l uint64) {
Jan Winkelmann's avatar
Jan Winkelmann committed
151 152 153
	h := re.w.Header()
	h.Set("X-Content-Length", strconv.FormatUint(l, 10))

154 155 156
	re.length = l
}

157
func (re *responseEmitter) Close() error {
keks's avatar
keks committed
158
	re.once.Do(func() { re.preamble(nil) })
159
	re.closeOnce.Do(func() { close(re.closeWait) })
keks's avatar
keks committed
160

161 162 163
	return nil
}

164 165
func (re *responseEmitter) SetError(v interface{}, errType cmdkit.ErrorType) {
	err := re.Emit(&cmdkit.Error{Message: fmt.Sprint(v), Code: errType})
166
	if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
167
		log.Debug("http.SetError err=", err)
168
		panic(err)
169
	}
170 171
}

Jan Winkelmann's avatar
Jan Winkelmann committed
172
// Flush the http connection
173
func (re *responseEmitter) Flush() {
174
	re.once.Do(func() { re.preamble(nil) })
keks's avatar
keks committed
175 176 177 178

	select {
	case <-re.closeWait:
		log.Error("flush after close")
keks's avatar
keks committed
179
		return
keks's avatar
keks committed
180 181
	default:
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
182

183 184 185
	re.w.(http.Flusher).Flush()
}

186
func (re *responseEmitter) preamble(value interface{}) {
187
	status := http.StatusOK
188
	h := re.w.Header()
Jan Winkelmann's avatar
Jan Winkelmann committed
189

190 191 192 193 194 195 196 197 198 199 200 201 202
	// unpack value if it needs special treatment in the type switch below
	if s, isSingle := value.(cmds.Single); isSingle {
		if err, isErr := s.Value.(cmdkit.Error); isErr {
			value = &err
		}

		if err, isErr := s.Value.(*cmdkit.Error); isErr {
			value = err
		}
	}

	var mime string

Jan Winkelmann's avatar
Jan Winkelmann committed
203
	switch v := value.(type) {
204
	case *cmdkit.Error:
205 206 207 208 209 210 211
		err := v
		if err.Code == cmdkit.ErrClient {
			status = http.StatusBadRequest
		} else {
			status = http.StatusInternalServerError
		}

212 213 214 215
		// if this is not a head request, the error will be sent as a trailer or as a value
		if re.method == "HEAD" {
			http.Error(re.w, err.Error(), status)
			re.w = nil
Jan Winkelmann's avatar
Jan Winkelmann committed
216

217 218
			return
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
219
	case io.Reader:
220 221 222
		// set streams output type to text to avoid issues with browsers rendering
		// html pages on priveleged api ports
		h.Set(streamHeader, "1")
keks's avatar
keks committed
223
		re.streaming = true
224 225 226 227 228

		mime = "text/plain"
	case cmds.Single:
	case nil:
		h.Set(channelHeader, "1")
Jan Winkelmann's avatar
Jan Winkelmann committed
229
	default:
230 231
		h.Set(channelHeader, "1")
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
232

Jan Winkelmann's avatar
Jan Winkelmann committed
233 234 235
	// Set up our potential trailer
	h.Set("Trailer", StreamErrHeader)

236
	if mime == "" {
237 238 239 240 241 242 243 244
		var ok bool

		// lookup mime type from map
		mime, ok = mimeTypes[re.encType]
		if !ok {
			// catch-all, set to text as default
			mime = "text/plain"
		}
245 246 247 248 249 250 251 252 253 254 255
	}

	h.Set(contentTypeHeader, mime)

	// set 'allowed' headers
	h.Set("Access-Control-Allow-Headers", AllowedExposedHeaders)
	// expose those headers
	h.Set("Access-Control-Expose-Headers", AllowedExposedHeaders)

	re.w.WriteHeader(status)
}
Jan Winkelmann's avatar
Jan Winkelmann committed
256

Jan Winkelmann's avatar
Jan Winkelmann committed
257 258 259 260
type responseWriterer interface {
	Lower() http.ResponseWriter
}

261 262 263
func (re *responseEmitter) SetEncoder(enc func(io.Writer) cmds.Encoder) {
	re.enc = enc(re.w)
}
Jan Winkelmann's avatar
Jan Winkelmann committed
264

265 266 267 268 269 270
func flushCopy(w io.Writer, r io.Reader) error {
	buf := make([]byte, 4096)
	f, ok := w.(http.Flusher)
	if !ok {
		_, err := io.Copy(w, r)
		return err
Jan Winkelmann's avatar
Jan Winkelmann committed
271
	}
272 273 274 275 276 277 278 279 280 281 282 283 284 285
	for {
		n, err := r.Read(buf)
		switch err {
		case io.EOF:
			if n <= 0 {
				return nil
			}
			// if data was returned alongside the EOF, pretend we didnt
			// get an EOF. The next read call should also EOF.
		case nil:
			// continue
		default:
			return err
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
286

287 288 289 290
		nw, err := w.Write(buf[:n])
		if err != nil {
			return err
		}
291

292 293 294 295 296 297
		if nw != n {
			return fmt.Errorf("http write failed to write full amount: %d != %d", nw, n)
		}

		f.Flush()
	}
298
}