Commit 0c3fea5f authored by Jan Winkelmann's avatar Jan Winkelmann

many many tiny changes...

parent 47a6222a
......@@ -97,13 +97,7 @@ type chanResponseEmitter struct {
func (re *chanResponseEmitter) SetError(err interface{}, t cmdsutil.ErrorType) {
// don't change value after emitting
/*
if re.emitted {
return
}
*/
*re.err = &cmdsutil.Error{Message: fmt.Sprint(err), Code: t}
re.Emit(&cmdsutil.Error{Message: fmt.Sprint(err), Code: t})
for _, re_ := range re.tees {
re_.SetError(err, t)
......@@ -133,6 +127,11 @@ func (re *chanResponseEmitter) Head() Head {
}
func (re *chanResponseEmitter) Close() error {
if re.ch == nil {
return nil
}
log.Debug("closing chanRE ", re)
close(re.ch)
re.ch = nil
......@@ -141,6 +140,10 @@ func (re *chanResponseEmitter) Close() error {
func (re *chanResponseEmitter) Emit(v interface{}) error {
re.emitted = true
if re.wait != nil {
close(re.wait)
re.wait = nil
}
if re.ch == nil {
return fmt.Errorf("emitter closed")
......
......@@ -59,12 +59,7 @@ func Parse(input []string, stdin *os.File, root *cmds.Command) (cmds.Request, *c
req.SetFiles(file)
}
err = cmd.CheckArguments(req)
if err != nil {
return req, cmd, path, err
}
return req, cmd, path, nil
return req, cmd, path, cmd.CheckArguments(req)
}
func ParseArgs(req cmds.Request, inputs []string, stdin *os.File, argDefs []cmdsutil.Argument, root *cmds.Command) ([]string, []files.File, error) {
......
......@@ -13,16 +13,18 @@ type ErrSet struct {
error
}
func NewResponseEmitter(w io.WriteCloser, enc func(cmds.Response) func(io.Writer) cmds.Encoder, res cmds.Response) cmds.ResponseEmitter {
func NewResponseEmitter(w io.WriteCloser, enc func(cmds.Request) func(io.Writer) cmds.Encoder, req cmds.Request) (cmds.ResponseEmitter, <-chan *cmdsutil.Error) {
ch := make(chan *cmdsutil.Error)
if enc == nil {
enc = func(cmds.Response) func(io.Writer) cmds.Encoder {
enc = func(cmds.Request) func(io.Writer) cmds.Encoder {
return func(io.Writer) cmds.Encoder {
return nil
}
}
}
return &responseEmitter{w: w, enc: enc(res)(w)}
return &responseEmitter{w: w, enc: enc(req)(w), ch: ch}, ch
}
type responseEmitter struct {
......@@ -35,6 +37,7 @@ type responseEmitter struct {
tees []cmds.ResponseEmitter
emitted bool
ch chan<- *cmdsutil.Error
}
func (re *responseEmitter) SetLength(l uint64) {
......@@ -53,8 +56,7 @@ func (re *responseEmitter) SetError(v interface{}, errType cmdsutil.ErrorType) {
log.Debugf("re.SetError(%v, %v)", v, errType)
err := &cmdsutil.Error{Message: fmt.Sprint(v), Code: errType}
//re.Emit(err)
re.err = err
re.Emit(err)
for _, re_ := range re.tees {
re_.SetError(v, errType)
......@@ -62,7 +64,17 @@ func (re *responseEmitter) SetError(v interface{}, errType cmdsutil.ErrorType) {
}
func (re *responseEmitter) Close() error {
return re.w.Close()
if re.w == nil {
log.Warning("more than one call to RespEm.Close!")
return nil
}
log.Debug("closing RE")
log.Debugf("%s", debug.Stack())
close(re.ch)
re.w = nil
return nil
}
// Head returns the current head.
......@@ -81,8 +93,22 @@ func (re *responseEmitter) Emit(v interface{}) error {
}
log.Debugf("re.Emit(%T)", v)
if re.err != nil {
return ErrSet{re.err}
if re.w == nil {
return io.ErrClosedPipe
}
if err, ok := v.(cmdsutil.Error); ok {
log.Warningf("fixerr %s", debug.Stack())
v = &err
}
if err, ok := v.(*cmdsutil.Error); ok {
log.Warning("sending err to ch")
log.Debugf("%s", debug.Stack())
re.ch <- err
log.Debug("sent err to ch")
re.Close()
return nil
}
log.Debug("copying to tees")
......
......@@ -18,6 +18,22 @@ const (
String = reflect.String
)
// Flag names
const (
EncShort = "enc"
EncLong = "encoding"
RecShort = "r"
RecLong = "recursive"
ChanOpt = "stream-channels"
TimeoutOpt = "timeout"
)
// options that are used by this package
var OptionEncodingType = StringOption(EncLong, EncShort, "The encoding type the output should be encoded with (json, xml, or text)")
var OptionRecursivePath = BoolOption(RecLong, RecShort, "Add directory paths recursively").Default(false)
var OptionStreamChannels = BoolOption(ChanOpt, "Stream channel output")
var OptionTimeout = StringOption(TimeoutOpt, "set a global timeout on the command")
type OptMap map[string]interface{}
// Option is used to specify a field that will be provided by a consumer
......@@ -181,19 +197,3 @@ func (ov OptionValue) String() (value string, found bool, err error) {
}
return val, ov.ValueFound, err
}
// Flag names
const (
EncShort = "enc"
EncLong = "encoding"
RecShort = "r"
RecLong = "recursive"
ChanOpt = "stream-channels"
TimeoutOpt = "timeout"
)
// options that are used by this package
var OptionEncodingType = StringOption(EncLong, EncShort, "The encoding type the output should be encoded with (json, xml, or text)")
var OptionRecursivePath = BoolOption(RecLong, RecShort, "Add directory paths recursively").Default(false)
var OptionStreamChannels = BoolOption(ChanOpt, "Stream channel output")
var OptionTimeout = StringOption(TimeoutOpt, "set a global timeout on the command")
......@@ -12,6 +12,7 @@ import (
"errors"
"fmt"
"io"
"os"
"reflect"
"github.com/ipfs/go-ipfs-cmds/cmdsutil"
......@@ -21,6 +22,8 @@ import (
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
)
const DefaultOutputEncoding = JSON
var log = logging.Logger("cmds")
// Function is the type of function that Commands use.
......@@ -39,8 +42,8 @@ type Command struct {
// after writing when using multipart requests. The request body will not be
// available for reading after the HTTP connection has been written to.
Run Function
PostRun map[EncodingType]func(Request, Response) Response
Encoders map[EncodingType]func(Response) func(io.Writer) Encoder
PostRun map[EncodingType]func(Request, ResponseEmitter) ResponseEmitter
Encoders map[EncodingType]func(Request) func(io.Writer) Encoder
Helptext cmdsutil.HelpText
// External denotes that a command is actually an external binary.
......@@ -66,6 +69,8 @@ var ErrIncorrectType = errors.New("The command returned a value with a different
// Call invokes the command for the given Request
func (c *Command) Call(req Request, re ResponseEmitter) error {
defer re.Close()
cmd, err := c.Get(req.Path())
if err != nil {
return err
......@@ -85,31 +90,39 @@ func (c *Command) Call(req Request, re ResponseEmitter) error {
return err
}
reChan, resChan := NewChanResponsePair(req)
re.Tee(reChan)
fmt.Fprintln(os.Stderr, "Call: requested encoding ", req.Option(cmdsutil.EncShort))
// TODO keks: wat
if re_, ok := re.(EncodingEmitter); ok {
if encType, found, err := req.Option(cmdsutil.EncShort).String(); found && err == nil {
if enc, ok := cmd.Encoders[EncodingType(encType)]; ok {
re_.SetEncoder(enc(resChan))
log.Debug("updated encoder to", enc, "(from Command struct)")
encTypeStr, found, err := req.Option(cmdsutil.EncShort).String()
encTypeSrc := "request"
encType := EncodingType(encTypeStr)
if !found || err != nil {
encTypeSrc = "default"
encType = DefaultOutputEncoding
}
log.Debugf("finding encoder for %s from %s", encType, encTypeSrc)
if enc, ok := cmd.Encoders[EncodingType(encType)]; ok {
re_.SetEncoder(enc(req))
log.Debug("updated encoder to", enc, "(from Command struct)")
} else {
if enc, ok := Encoders[EncodingType(encType)]; ok {
re_.SetEncoder(enc(req))
log.Debugf("updated encoder to %v (global Encoder)", enc)
} else {
if enc, ok := Encoders[EncodingType(encType)]; ok {
re_.SetEncoder(enc(resChan))
log.Debug("updated encoder to", enc, "(global Encoder)")
} else {
log.Debugf("no encoder found for encoding %#v", encType)
}
log.Debug("no encoder found for encoding")
}
} else {
log.Debug("no encoding found in request. err:", err)
}
} else {
log.Debugf("responseemitter is not an EncodingEmitter, but a %T", re)
}
log.Debugf("Call: calling cmd.Run")
log.Debugf("Call: calling cmd.Run %v", cmd.Run)
cmd.Run(req, re)
return nil
......
......@@ -18,7 +18,7 @@ func (c dummyCloser) Close() error {
func newBufferResponseEmitter() ResponseEmitter {
buf := bytes.NewBuffer(nil)
wc := writecloser{Writer: buf}
return NewWriterResponseEmitter(wc, Encoders[Text])
return NewWriterResponseEmitter(wc, nil, Encoders[Text])
}
func noop(req Request, re ResponseEmitter) {
......@@ -245,7 +245,7 @@ func TestPostRun(t *testing.T) {
finalLength: 4,
next: []interface{}{14},
postRun: func(req Request, res Response) Response {
re, res_ := NewChanResponsePair(&req)
re, res_ := NewChanResponsePair(req)
re.SetLength(res.Length() + 1)
go func() {
......@@ -301,7 +301,7 @@ func TestPostRun(t *testing.T) {
req, _ := NewRequest(nil, nil, nil, nil, nil, cmdOpts)
req.SetOption(cmdsutil.EncShort, CLI)
re, res := NewChanResponsePair(&req)
re, res := NewChanResponsePair(req)
err := cmd.Call(req, re)
if err != nil {
......
......@@ -22,11 +22,14 @@ type EncodingType string
// Supported EncodingType constants.
const (
Undefined = ""
JSON = "json"
XML = "xml"
Protobuf = "protobuf"
Text = "text"
CLI = "cli"
// TODO: support more encoding types
)
......@@ -39,20 +42,20 @@ var Decoders = map[EncodingType]func(w io.Reader) Decoder{
},
}
var Encoders = map[EncodingType]func(res Response) func(w io.Writer) Encoder{
XML: func(res Response) func(io.Writer) Encoder {
var Encoders = map[EncodingType]func(req Request) func(w io.Writer) Encoder{
XML: func(req Request) func(io.Writer) Encoder {
return func(w io.Writer) Encoder { return xml.NewEncoder(w) }
},
JSON: func(res Response) func(io.Writer) Encoder {
JSON: func(req Request) func(io.Writer) Encoder {
return func(w io.Writer) Encoder { return json.NewEncoder(w) }
},
Text: func(res Response) func(io.Writer) Encoder {
Text: func(req Request) func(io.Writer) Encoder {
return func(w io.Writer) Encoder { return TextEncoder{w} }
},
}
func MakeEncoder(f func(io.Writer, interface{}) error) func(Response) func(io.Writer) Encoder {
return func(res Response) func(io.Writer) Encoder {
func MakeEncoder(f func(io.Writer, interface{}) error) func(Request) func(io.Writer) Encoder {
return func(req Request) func(io.Writer) Encoder {
return func(w io.Writer) Encoder { return &genericEncoder{f: f, w: w} }
}
}
......@@ -71,6 +74,7 @@ type TextEncoder struct {
}
func (e TextEncoder) Encode(v interface{}) error {
_, err := fmt.Fprint(e.w, v)
log.Debug("TextEncoder.Encode(%v) to %v", v, fmt.Sprintf("%s\n", v))
_, err := fmt.Fprintf(e.w, "%s", v)
return err
}
......@@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"net/url"
//"runtime/debug"
"strings"
"sync"
......@@ -126,8 +125,6 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r := recover(); r != nil {
log.Error("a panic has occurred in the commands handler!")
log.Error(r)
//debug.PrintStack()
}
}()
......@@ -183,9 +180,6 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
// XXX changed order of the next two blocks.
// XXX might change behaviour!
// set user's headers first.
for k, v := range i.cfg.Headers {
if !skipAPIHeader(k) {
......@@ -193,15 +187,47 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
chRe, chRes := cmds.NewChanResponsePair(req)
log.Debug("request options: ", req.Options())
var (
enc = cmds.EncodingType(cmds.Undefined)
encStr = string(cmds.Undefined)
ok = false
opts = req.Options()
)
// try EncShort
encSource := "short"
encIface := opts[cmdsutil.EncShort]
// if that didn't work, try EncLong
if encIface == nil {
encSource = "long"
encIface = opts[cmdsutil.EncLong]
}
// try casting
if encIface != nil {
encStr, ok = encIface.(string)
}
log.Debug("req enc:", encSource, encStr, ok)
enc := req.Options()[cmdsutil.EncShort]
if enc == nil {
// if casting worked, convert to EncodingType
if ok {
enc = cmds.EncodingType(encStr)
}
// in case of error, use default
if !ok || enc == cmds.Undefined {
encSource = "default"
enc = cmds.JSON
}
re := NewResponseEmitter(w, cmds.EncodingType(enc.(string)), r.Method, chRes)
re.Tee(chRe)
log.Debug("req enc:", enc)
log.Debug("chose encoding ", enc, " from source ", encSource)
re := NewResponseEmitter(w, enc, r.Method, req)
log.Debug("root.Call()")
// call the command
......
......@@ -93,6 +93,8 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
contentType := httpRes.Header.Get(contentTypeHeader)
contentType = strings.Split(contentType, ";")[0]
log.Debug("header", httpRes.Header)
// If we ran into an error
if httpRes.StatusCode >= http.StatusBadRequest {
e := &cmdsutil.Error{}
......
......@@ -5,6 +5,7 @@ import (
"io"
"net/http"
"strconv"
"sync"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-ipfs-cmds/cmdsutil"
......@@ -17,13 +18,13 @@ var (
)
// NewResponeEmitter returns a new ResponseEmitter.
func NewResponseEmitter(w http.ResponseWriter, encType cmds.EncodingType, method string, res cmds.Response) ResponseEmitter {
func NewResponseEmitter(w http.ResponseWriter, encType cmds.EncodingType, method string, req cmds.Request) ResponseEmitter {
re := &responseEmitter{
w: w,
encType: encType,
enc: cmds.Encoders[encType](res)(w),
enc: cmds.Encoders[encType](req)(w),
method: method,
req: res.Request(),
req: req,
}
return re
}
......@@ -43,8 +44,9 @@ type responseEmitter struct {
length uint64
err *cmdsutil.Error
emitted bool
method string
emitted bool
emittedLock sync.Mutex
method string
tees []cmds.ResponseEmitter
}
......@@ -52,18 +54,19 @@ type responseEmitter struct {
func (re *responseEmitter) Emit(value interface{}) error {
var err error
re.emittedLock.Lock()
if !re.emitted {
re.emitted = true
re.preamble(value)
}
re.emittedLock.Unlock()
go func() {
for _, re_ := range re.tees {
re_.Emit(value)
}
}()
if !re.emitted {
re.emitted = true
log.Debug("calling preamle")
re.preamble(value)
}
// ignore those
if value == nil {
return nil
......@@ -74,15 +77,11 @@ func (re *responseEmitter) Emit(value interface{}) error {
return nil
}
// Special case: if text encoding and an error, just print it out.
// TODO review question: its like that in response.go, should we keep that?
if re.encType == cmds.Text && re.err != nil {
value = re.err
}
switch v := value.(type) {
case io.Reader:
_, err = io.Copy(re.w, v)
case cmdsutil.Error, *cmdsutil.Error:
// nothing
default:
err = re.enc.Encode(value)
}
......@@ -91,6 +90,9 @@ func (re *responseEmitter) Emit(value interface{}) error {
}
func (re *responseEmitter) SetLength(l uint64) {
h := re.w.Header()
h.Set("X-Content-Length", strconv.FormatUint(l, 10))
re.length = l
for _, re_ := range re.tees {
......@@ -104,11 +106,7 @@ func (re *responseEmitter) Close() error {
}
func (re *responseEmitter) SetError(err interface{}, code cmdsutil.ErrorType) {
re.err = &cmdsutil.Error{Message: fmt.Sprint(err), Code: code}
// force send of preamble
// TODO is this the right thing to do?
re.Emit(nil)
re.Emit(&cmdsutil.Error{Message: fmt.Sprint(err), Code: code})
for _, re_ := range re.tees {
re_.SetError(err, code)
......@@ -128,50 +126,44 @@ func (re *responseEmitter) Flush() {
}
func (re *responseEmitter) preamble(value interface{}) {
log.Debug("re.preamble")
log.Debugf("re.preamble, v=%#v", value)
defer log.Debug("preemble done, headers: ", re.w.Header())
h := re.w.Header()
// Expose our agent to allow identification
h.Set("Server", "go-ipfs/"+config.CurrentVersionNumber)
status := http.StatusOK
// if response contains an error, write an HTTP error status code
if e := re.err; e != nil {
if e.Code == cmdsutil.ErrClient {
switch v := value.(type) {
case *cmdsutil.Error:
err := v
if err.Code == cmdsutil.ErrClient {
status = http.StatusBadRequest
} else {
status = http.StatusInternalServerError
}
// NOTE: The error will actually be written out below
}
log.Debugf("preamble status=%v", status)
log.Debugf("preamble re.err=%#v", re.err)
// write error to connection
if re.err != nil {
http.Error(re.w, re.err.Error(), http.StatusInternalServerError)
http.Error(re.w, err.Error(), status)
re.w = nil
log.Debug("sent error")
log.Debug("sent error: ", err)
return
}
// Set up our potential trailer
h.Set("Trailer", StreamErrHeader)
if re.length > 0 {
h.Set("X-Content-Length", strconv.FormatUint(re.length, 10))
}
if _, ok := value.(io.Reader); ok {
case io.Reader:
// set streams output type to text to avoid issues with browsers rendering
// html pages on priveleged api ports
h.Set(streamHeader, "1")
} else {
default:
h.Set(channelHeader, "1")
}
log.Debugf("preamble status=%v", status)
// Set up our potential trailer
h.Set("Trailer", StreamErrHeader)
// lookup mime type from map
mime := mimeTypes[re.encType]
......@@ -190,6 +182,10 @@ func (re *responseEmitter) preamble(value interface{}) {
re.w.WriteHeader(status)
}
type responseWriterer interface {
Lower() http.ResponseWriter
}
func (re *responseEmitter) Tee(re_ cmds.ResponseEmitter) {
re.tees = append(re.tees, re_)
......@@ -203,5 +199,6 @@ func (re *responseEmitter) Tee(re_ cmds.ResponseEmitter) {
}
func (re *responseEmitter) SetEncoder(enc func(io.Writer) cmds.Encoder) {
log.Debugf("SetEncoder called :( '%s'", re.encType)
re.enc = enc(re.w)
}
......@@ -122,7 +122,11 @@ type fakeResponse struct {
func (r *fakeResponse) Send() error {
log.Debugf("fakeResponse: sending %v to RE of type %T", r.out, r.re)
defer log.Debugf("fakeResponse: done")
defer func() {
log.Debugf("fakeResponse: closing RE")
r.re.Close()
log.Debugf("fakeResponse: done")
}()
if r.out == nil {
return nil
......@@ -141,7 +145,7 @@ func (r *fakeResponse) Send() error {
}
}
default:
log.Debug("fakeResponse: calling Emit(%v) once", out)
log.Debugf("fakeResponse: calling Emit(%v) once", out)
return r.re.Emit(out)
}
......@@ -218,17 +222,34 @@ func (mer *marshalerEncoderResponse) Error() *cmdsutil.Error {
// make an Encoder from a Marshaler
type MarshalerEncoder struct {
m oldcmds.Marshaler
res oldcmds.Response
w io.Writer
req Request
}
func NewMarshalerEncoder(req Request, m oldcmds.Marshaler, w io.Writer) *MarshalerEncoder {
me := &MarshalerEncoder{
m: m,
w: w,
req: req,
}
return me
}
func (me *MarshalerEncoder) Encode(v interface{}) error {
log.Debugf("ME.Encode: me: %#v", me)
r, err := me.m(me.res)
log.Debugf("ME.Encode: r:%v, err:%v", r, err)
if err != nil || r == nil {
re, res := NewChanResponsePair(me.req)
log.Debugf("ME.Encode: me: %#v, v: %#v", me, v)
go re.Emit(v)
r, err := me.m(&responseWrapper{Response: res})
if err != nil {
return err
}
if r == nil {
// behave like empty reader
return nil
}
_, err = io.Copy(me.w, r)
return err
......@@ -276,12 +297,6 @@ func OldCommand(cmd *Command) *oldcmds.Command {
External: cmd.External,
Type: cmd.Type,
Run: func(oldReq oldcmds.Request, res oldcmds.Response) {
req := &oldRequestWrapper{oldReq}
re := &wrappedResponseEmitter{res}
cmd.Run(req, re)
},
Subcommands: func() map[string]*oldcmds.Command {
cs := make(map[string]*oldcmds.Command)
......@@ -297,6 +312,15 @@ func OldCommand(cmd *Command) *oldcmds.Command {
}(),
}
if cmd.Run != nil {
oldcmd.Run = func(oldReq oldcmds.Request, res oldcmds.Response) {
req := &oldRequestWrapper{oldReq}
re := &wrappedResponseEmitter{res}
cmd.Run(req, re)
}
log.Debugf("faker: added Run %v", oldcmd.Run)
}
if cmd.PreRun != nil {
oldcmd.PreRun = func(oldReq oldcmds.Request) error {
req := &oldRequestWrapper{oldReq}
......@@ -321,16 +345,20 @@ func NewCommand(oldcmd *oldcmds.Command) *Command {
External: oldcmd.External,
Type: oldcmd.Type,
Run: func(req Request, re ResponseEmitter) {
OldSubcommands: oldcmd.Subcommands,
}
if oldcmd.Run != nil {
cmd.Run = func(req Request, re ResponseEmitter) {
oldReq := &requestWrapper{req}
res := &fakeResponse{req: oldReq, re: re}
log.Debugf("fakecmd.Run: calling real cmd.Run")
oldcmd.Run(oldReq, res)
log.Debugf("fakecmd.Run: calling res.Send")
res.Send()
},
OldSubcommands: oldcmd.Subcommands,
log.Debugf("fakecmd.Run: done")
}
}
if oldcmd.PreRun != nil {
......@@ -340,20 +368,16 @@ func NewCommand(oldcmd *oldcmds.Command) *Command {
}
}
cmd.Encoders = make(map[EncodingType]func(Response) func(io.Writer) Encoder)
cmd.Encoders = make(map[EncodingType]func(Request) func(io.Writer) Encoder)
for encType, m := range oldcmd.Marshalers {
log.Debugf("adding marshaler %v for type encType %v", m, encType)
cmd.Encoders[EncodingType(encType)] = func(res Response) func(io.Writer) Encoder {
cmd.Encoders[EncodingType(encType)] = func(req Request) func(io.Writer) Encoder {
return func(w io.Writer) Encoder {
log.Debugf("adding marshalerencoder for %v: %v; res: %v", encType, m, res)
log.Debugf("adding marshalerencoder for %v: %v; res: %v", encType, m, req)
return &MarshalerEncoder{
res: &responseWrapper{Response: res},
m: m,
w: w,
}
return NewMarshalerEncoder(req, m, w)
}
}
}
......
package cmds
import (
"fmt"
"github.com/ipfs/go-ipfs-cmds/cmdsutil"
)
var (
ErrRcvdError = fmt.Errorf("received command error")
)
// Response is the result of a command request. Response is returned to the client.
type Response interface {
Request() Request
......@@ -11,6 +17,10 @@ type Response interface {
Error() *cmdsutil.Error
Length() uint64
// Next returns the next emitted value.
// The returned error can be a network or decoding error.
// The error can also be ErrRcvdError if an error has been emitted.
// In this case the emitted error can be accessed using the Error() method.
Next() (interface{}, error)
}
......
......@@ -46,6 +46,8 @@ type Header interface {
}
func Copy(re ResponseEmitter, res Response) error {
defer log.Debug("Copy returned.")
log.Debugf("copy from %T to %T", res, re)
re.SetLength(res.Length())
......@@ -58,7 +60,7 @@ func Copy(re ResponseEmitter, res Response) error {
}
v, err := res.Next()
log.Debug("copy", v, err)
log.Debug("copy ", v, err)
if err == io.EOF {
re.Close()
return nil
......
......@@ -10,14 +10,12 @@ import (
"github.com/ipfs/go-ipfs-cmds/cmdsutil"
)
var EmittedErr = fmt.Errorf("received an error")
func NewWriterResponseEmitter(w io.WriteCloser, res Response, enc func(io.Writer) Encoder) *WriterResponseEmitter {
func NewWriterResponseEmitter(w io.WriteCloser, req Request, enc func(Request) func(io.Writer) Encoder) *WriterResponseEmitter {
return &WriterResponseEmitter{
w: w,
c: w,
enc: enc(w),
req: res.Request(),
enc: enc(req)(w),
req: req,
}
}
......@@ -77,8 +75,13 @@ func (r *readerResponse) Next() (interface{}, error) {
r.once.Do(func() { close(r.emitted) })
v := a.Interface()
if err, ok := v.(error); ok {
return err, EmittedErr
if err, ok := v.(cmdsutil.Error); ok {
r.err = &err
return nil, ErrRcvdError
}
if err, ok := v.(*cmdsutil.Error); ok {
r.err = err
return nil, ErrRcvdError
}
return v, nil
......@@ -99,7 +102,7 @@ type WriterResponseEmitter struct {
}
func (re *WriterResponseEmitter) SetError(err interface{}, code cmdsutil.ErrorType) {
*re.err = cmdsutil.Error{Message: fmt.Sprint(err), Code: code}
re.Emit(&cmdsutil.Error{Message: fmt.Sprint(err), Code: code})
for _, re_ := range re.tees {
re_.SetError(err, code)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment