responseemitter.go 4.46 KB
Newer Older
1 2 3 4 5 6 7
package http

import (
	"fmt"
	"io"
	"net/http"
	"strconv"
Jan Winkelmann's avatar
Jan Winkelmann committed
8
	"sync"
9

10 11 12
	cmds "github.com/ipfs/go-ipfs-cmds"
	"github.com/ipfs/go-ipfs-cmds/cmdsutil"

13 14 15 16 17 18 19 20
	"github.com/ipfs/go-ipfs/repo/config"
)

var (
	HeadRequest = fmt.Errorf("HEAD request")
)

// NewResponeEmitter returns a new ResponseEmitter.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
func NewResponseEmitter(w http.ResponseWriter, method string, req cmds.Request) ResponseEmitter {
	log.Debug("request options: ", req.Options())

	var (
		encType = cmds.EncodingType(cmds.Undefined)
		encStr  = string(cmds.Undefined)
		ok      = false
		opts    = req.Options()
	)

	// try EncShort
	encSource := "short"
	encIface := opts[cmdsutil.EncShort]

	// if that didn't work, try EncLong
	if encIface == nil {
		encSource = "long"
		encIface = opts[cmdsutil.EncLong]
	}

	// try casting
	if encIface != nil {
		encStr, ok = encIface.(string)
	}

	log.Debug("req encType:", encSource, encStr, ok)

	// if casting worked, convert to EncodingType
	if ok {
		encType = cmds.EncodingType(encStr)
	}

	// in case of error, use default
	if !ok || encType == cmds.Undefined {
		encSource = "default"
		encType = cmds.JSON
	}

	log.Debug("chose encoding ", encType, " from source ", encSource)

61
	re := &responseEmitter{
62 63
		w:       w,
		encType: encType,
Jan Winkelmann's avatar
Jan Winkelmann committed
64
		enc:     cmds.Encoders[encType](req)(w),
65
		method:  method,
Jan Winkelmann's avatar
Jan Winkelmann committed
66
		req:     req,
67 68 69 70
	}
	return re
}

71 72 73 74 75 76
type ResponseEmitter interface {
	cmds.ResponseEmitter
	http.Flusher
}

type responseEmitter struct {
Jan Winkelmann's avatar
Jan Winkelmann committed
77 78 79 80
	w http.ResponseWriter

	enc     cmds.Encoder
	encType cmds.EncodingType
Jan Winkelmann's avatar
Jan Winkelmann committed
81
	req     cmds.Request
Jan Winkelmann's avatar
Jan Winkelmann committed
82 83

	length uint64
84
	err    *cmdsutil.Error
Jan Winkelmann's avatar
Jan Winkelmann committed
85

Jan Winkelmann's avatar
Jan Winkelmann committed
86 87 88
	emitted     bool
	emittedLock sync.Mutex
	method      string
Jan Winkelmann's avatar
Jan Winkelmann committed
89 90

	tees []cmds.ResponseEmitter
91 92
}

93
func (re *responseEmitter) Emit(value interface{}) error {
94 95
	var err error

Jan Winkelmann's avatar
Jan Winkelmann committed
96 97 98 99 100 101 102
	re.emittedLock.Lock()
	if !re.emitted {
		re.emitted = true
		re.preamble(value)
	}
	re.emittedLock.Unlock()

Jan Winkelmann's avatar
Jan Winkelmann committed
103 104 105 106 107 108
	go func() {
		for _, re_ := range re.tees {
			re_.Emit(value)
		}
	}()

109 110 111 112 113
	// ignore those
	if value == nil {
		return nil
	}

114
	// return immediately if this is a head request
Jan Winkelmann's avatar
Jan Winkelmann committed
115
	if re.method == "HEAD" {
116 117 118 119 120 121
		return nil
	}

	switch v := value.(type) {
	case io.Reader:
		_, err = io.Copy(re.w, v)
Jan Winkelmann's avatar
Jan Winkelmann committed
122 123
	case cmdsutil.Error, *cmdsutil.Error:
		// nothing
124 125 126 127 128 129 130
	default:
		err = re.enc.Encode(value)
	}

	return err
}

131
func (re *responseEmitter) SetLength(l uint64) {
Jan Winkelmann's avatar
Jan Winkelmann committed
132 133 134
	h := re.w.Header()
	h.Set("X-Content-Length", strconv.FormatUint(l, 10))

135
	re.length = l
Jan Winkelmann's avatar
Jan Winkelmann committed
136 137 138 139

	for _, re_ := range re.tees {
		re_.SetLength(l)
	}
140 141
}

142
func (re *responseEmitter) Close() error {
143 144 145 146
	// can't close HTTP connections
	return nil
}

147
func (re *responseEmitter) SetError(err interface{}, code cmdsutil.ErrorType) {
Jan Winkelmann's avatar
Jan Winkelmann committed
148
	re.Emit(&cmdsutil.Error{Message: fmt.Sprint(err), Code: code})
Jan Winkelmann's avatar
Jan Winkelmann committed
149 150 151 152

	for _, re_ := range re.tees {
		re_.SetError(err, code)
	}
153 154
}

Jan Winkelmann's avatar
Jan Winkelmann committed
155
// Flush the http connection
156
func (re *responseEmitter) Flush() {
Jan Winkelmann's avatar
Jan Winkelmann committed
157 158
	if !re.emitted {
		re.emitted = true
159 160 161

		// setting this to nil means that it sends channel/chunked-encoding headers
		re.preamble(nil)
Jan Winkelmann's avatar
Jan Winkelmann committed
162 163
	}

164 165 166
	re.w.(http.Flusher).Flush()
}

167
func (re *responseEmitter) preamble(value interface{}) {
Jan Winkelmann's avatar
Jan Winkelmann committed
168 169
	log.Debugf("re.preamble, v=%#v", value)
	defer log.Debug("preemble done, headers: ", re.w.Header())
170

171 172 173 174 175
	h := re.w.Header()
	// Expose our agent to allow identification
	h.Set("Server", "go-ipfs/"+config.CurrentVersionNumber)

	status := http.StatusOK
Jan Winkelmann's avatar
Jan Winkelmann committed
176 177 178 179 180 181

	switch v := value.(type) {
	case *cmdsutil.Error:
		err := v

		if err.Code == cmdsutil.ErrClient {
182 183 184 185 186
			status = http.StatusBadRequest
		} else {
			status = http.StatusInternalServerError
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
187 188
		http.Error(re.w, err.Error(), status)
		re.w = nil
Jan Winkelmann's avatar
Jan Winkelmann committed
189

Jan Winkelmann's avatar
Jan Winkelmann committed
190
		log.Debug("sent error: ", err)
Jan Winkelmann's avatar
Jan Winkelmann committed
191 192

		return
Jan Winkelmann's avatar
Jan Winkelmann committed
193
	case io.Reader:
194 195 196
		// set streams output type to text to avoid issues with browsers rendering
		// html pages on priveleged api ports
		h.Set(streamHeader, "1")
Jan Winkelmann's avatar
Jan Winkelmann committed
197
	default:
198 199
		h.Set(channelHeader, "1")
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
200

Jan Winkelmann's avatar
Jan Winkelmann committed
201 202 203 204 205
	log.Debugf("preamble status=%v", status)

	// Set up our potential trailer
	h.Set("Trailer", StreamErrHeader)

Jan Winkelmann's avatar
Jan Winkelmann committed
206 207
	// lookup mime type from map
	mime := mimeTypes[re.encType]
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222

	// catch-all, set to text as default
	if mime == "" {
		mime = "text/plain"
	}

	h.Set(contentTypeHeader, mime)

	// set 'allowed' headers
	h.Set("Access-Control-Allow-Headers", AllowedExposedHeaders)
	// expose those headers
	h.Set("Access-Control-Expose-Headers", AllowedExposedHeaders)

	re.w.WriteHeader(status)
}
Jan Winkelmann's avatar
Jan Winkelmann committed
223

Jan Winkelmann's avatar
Jan Winkelmann committed
224 225 226 227
type responseWriterer interface {
	Lower() http.ResponseWriter
}

Jan Winkelmann's avatar
Jan Winkelmann committed
228 229 230 231 232 233 234 235 236 237 238
func (re *responseEmitter) Tee(re_ cmds.ResponseEmitter) {
	re.tees = append(re.tees, re_)

	if re.emitted {
		re_.SetLength(re.length)
	}

	if re.err != nil {
		re_.SetError(re.err.Message, re.err.Code)
	}
}