Unverified Commit efc9d266 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #38 from ipfs/fix/data-races

Fix race conditions
parents 0ce2a5d2 235b0f74
...@@ -195,6 +195,14 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -195,6 +195,14 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
} }
d := re.(Doner)
select {
case <-d.Done():
case <-req.Context().Done():
log.Error("waiting for http.Responseemitter to close but then %s", req.Context().Err())
// too late to send an error, just return
}
return return
} }
......
...@@ -17,6 +17,10 @@ var ( ...@@ -17,6 +17,10 @@ var (
HeadRequest = fmt.Errorf("HEAD request") HeadRequest = fmt.Errorf("HEAD request")
) )
type Doner interface {
Done() <-chan struct{}
}
// NewResponeEmitter returns a new ResponseEmitter. // NewResponeEmitter returns a new ResponseEmitter.
func NewResponseEmitter(w http.ResponseWriter, method string, req cmds.Request) ResponseEmitter { func NewResponseEmitter(w http.ResponseWriter, method string, req cmds.Request) ResponseEmitter {
encType := cmds.GetEncoding(req) encType := cmds.GetEncoding(req)
...@@ -28,11 +32,12 @@ func NewResponseEmitter(w http.ResponseWriter, method string, req cmds.Request) ...@@ -28,11 +32,12 @@ func NewResponseEmitter(w http.ResponseWriter, method string, req cmds.Request)
} }
re := &responseEmitter{ re := &responseEmitter{
w: w, w: w,
encType: encType, encType: encType,
enc: enc, enc: enc,
method: method, method: method,
req: req, req: req,
closeWait: make(chan struct{}),
} }
return re return re
} }
...@@ -55,6 +60,9 @@ type responseEmitter struct { ...@@ -55,6 +60,9 @@ type responseEmitter struct {
streaming bool streaming bool
once sync.Once once sync.Once
method string method string
closeOnce sync.Once
closeWait chan struct{}
} }
func (re *responseEmitter) Emit(value interface{}) error { func (re *responseEmitter) Emit(value interface{}) error {
...@@ -103,7 +111,6 @@ func (re *responseEmitter) Emit(value interface{}) error { ...@@ -103,7 +111,6 @@ func (re *responseEmitter) Emit(value interface{}) error {
switch v := value.(type) { switch v := value.(type) {
case io.Reader: case io.Reader:
re.streaming = true
err = flushCopy(re.w, v) err = flushCopy(re.w, v)
case *cmdkit.Error: case *cmdkit.Error:
if re.streaming || v.Code == cmdkit.ErrFatal { if re.streaming || v.Code == cmdkit.ErrFatal {
...@@ -127,6 +134,10 @@ func (re *responseEmitter) Emit(value interface{}) error { ...@@ -127,6 +134,10 @@ func (re *responseEmitter) Emit(value interface{}) error {
return err return err
} }
func (re *responseEmitter) Done() <-chan struct{} {
return re.closeWait
}
func (re *responseEmitter) SetLength(l uint64) { func (re *responseEmitter) SetLength(l uint64) {
h := re.w.Header() h := re.w.Header()
h.Set("X-Content-Length", strconv.FormatUint(l, 10)) h.Set("X-Content-Length", strconv.FormatUint(l, 10))
...@@ -136,7 +147,8 @@ func (re *responseEmitter) SetLength(l uint64) { ...@@ -136,7 +147,8 @@ func (re *responseEmitter) SetLength(l uint64) {
func (re *responseEmitter) Close() error { func (re *responseEmitter) Close() error {
re.once.Do(func() { re.preamble(nil) }) re.once.Do(func() { re.preamble(nil) })
// can't close HTTP connections re.closeOnce.Do(func() { close(re.closeWait) })
return nil return nil
} }
...@@ -152,6 +164,13 @@ func (re *responseEmitter) SetError(v interface{}, errType cmdkit.ErrorType) { ...@@ -152,6 +164,13 @@ func (re *responseEmitter) SetError(v interface{}, errType cmdkit.ErrorType) {
func (re *responseEmitter) Flush() { func (re *responseEmitter) Flush() {
re.once.Do(func() { re.preamble(nil) }) re.once.Do(func() { re.preamble(nil) })
select {
case <-re.closeWait:
log.Error("flush after close")
return
default:
}
re.w.(http.Flusher).Flush() re.w.(http.Flusher).Flush()
} }
...@@ -195,6 +214,7 @@ func (re *responseEmitter) preamble(value interface{}) { ...@@ -195,6 +214,7 @@ func (re *responseEmitter) preamble(value interface{}) {
// set streams output type to text to avoid issues with browsers rendering // set streams output type to text to avoid issues with browsers rendering
// html pages on priveleged api ports // html pages on priveleged api ports
h.Set(streamHeader, "1") h.Set(streamHeader, "1")
re.streaming = true
mime = "text/plain" mime = "text/plain"
case cmds.Single: case cmds.Single:
......
...@@ -402,27 +402,27 @@ func NewCommand(oldcmd *oldcmds.Command) *Command { ...@@ -402,27 +402,27 @@ func NewCommand(oldcmd *oldcmds.Command) *Command {
errCh := make(chan error) errCh := make(chan error)
go res.Send(errCh) go res.Send(errCh)
oldcmd.Run(oldReq, res) oldcmd.Run(oldReq, res)
select {
case err := <-errCh:
if err != nil {
select {
case <-req.Context().Done():
err = cmdkit.Error{Message: req.Context().Err().Error(), Code: cmdkit.ErrNormal}
default:
}
if e, ok := err.(*cmdkit.Error); ok { // No need to select with ctx.Done here because when the connection aborts, Emit will error.
err = *e // Actually we *can't* select with ctx.Done here, because otherwise we might return from the http handler
} // while there still is a goroutine writing to the ResponseWriter, which panics.
err := <-errCh
if err != nil {
select {
case <-req.Context().Done():
err = cmdkit.Error{Message: req.Context().Err().Error(), Code: cmdkit.ErrNormal}
default:
}
if e, ok := err.(cmdkit.Error); ok { if e, ok := err.(*cmdkit.Error); ok {
re.SetError(e.Message, e.Code) err = *e
} else { }
re.SetError(err.Error(), cmdkit.ErrNormal)
} if e, ok := err.(cmdkit.Error); ok {
re.SetError(e.Message, e.Code)
} else {
re.SetError(err.Error(), cmdkit.ErrNormal)
} }
case <-req.Context().Done():
re.SetError(req.Context().Err(), cmdkit.ErrNormal)
} }
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment