Commit 6fdbbdd1 authored by keks's avatar keks

introduce Single to notify that only one value will be emitted

parent e28bc8d0
0.4.5: QmQVvuDwXUGbtYmbmTcbLtGRYXnEbymaR2zEj38GVysqWe 0.4.7: QmUsuV7rMitqBCk2UPmX1f3Vtp4tJNi6xvXpkQgKujjW5R
...@@ -70,7 +70,7 @@ func (r *chanResponse) Length() uint64 { ...@@ -70,7 +70,7 @@ func (r *chanResponse) Length() uint64 {
return r.length return r.length
} }
func (r *chanResponse) RawNext() (interface{}, error) { func (r *chanResponse) Next() (interface{}, error) {
if r == nil { if r == nil {
return nil, io.EOF return nil, io.EOF
} }
...@@ -84,11 +84,19 @@ func (r *chanResponse) RawNext() (interface{}, error) { ...@@ -84,11 +84,19 @@ func (r *chanResponse) RawNext() (interface{}, error) {
select { select {
case v, ok := <-r.ch: case v, ok := <-r.ch:
if ok { if !ok {
return v, nil return nil, io.EOF
} }
return nil, io.EOF switch val := v.(type) {
case *cmdkit.Error:
r.err = val
return nil, ErrRcvdError
case Single:
return val.Value, nil
default:
return v, nil
}
case <-ctx.Done(): case <-ctx.Done():
close(r.done) close(r.done)
return nil, r.req.Context().Err() return nil, r.req.Context().Err()
...@@ -96,7 +104,7 @@ func (r *chanResponse) RawNext() (interface{}, error) { ...@@ -96,7 +104,7 @@ func (r *chanResponse) RawNext() (interface{}, error) {
} }
func (r *chanResponse) Next() (interface{}, error) { func (r *chanResponse) RawNext() (interface{}, error) {
if r == nil { if r == nil {
return nil, io.EOF return nil, io.EOF
} }
...@@ -110,19 +118,11 @@ func (r *chanResponse) Next() (interface{}, error) { ...@@ -110,19 +118,11 @@ func (r *chanResponse) Next() (interface{}, error) {
select { select {
case v, ok := <-r.ch: case v, ok := <-r.ch:
if !ok { if ok {
return nil, io.EOF
}
switch val := v.(type) {
case *cmdkit.Error:
r.err = val
return nil, ErrRcvdError
case Single:
return val.Value, nil
default:
return v, nil return v, nil
} }
return nil, io.EOF
case <-ctx.Done(): case <-ctx.Done():
close(r.done) close(r.done)
return nil, r.req.Context().Err() return nil, r.req.Context().Err()
...@@ -141,42 +141,6 @@ type chanResponseEmitter struct { ...@@ -141,42 +141,6 @@ type chanResponseEmitter struct {
emitted bool emitted bool
} }
func (re *chanResponseEmitter) SetError(v interface{}, errType cmdkit.ErrorType) {
err := re.Emit(&cmdkit.Error{Message: fmt.Sprint(v), Code: errType})
if err != nil {
panic(err)
}
}
func (re *chanResponseEmitter) SetLength(l uint64) {
// don't change value after emitting
if re.emitted {
return
}
*re.length = l
}
func (re *chanResponseEmitter) Head() Head {
<-re.wait
return Head{
Len: *re.length,
Err: *re.err,
}
}
func (re *chanResponseEmitter) Close() error {
if re.ch == nil {
return nil
}
close(re.ch)
re.ch = nil
return nil
}
func (re *chanResponseEmitter) Emit(v interface{}) error { func (re *chanResponseEmitter) Emit(v interface{}) error {
if ch, ok := v.(chan interface{}); ok { if ch, ok := v.(chan interface{}); ok {
v = (<-chan interface{})(ch) v = (<-chan interface{})(ch)
...@@ -215,3 +179,39 @@ func (re *chanResponseEmitter) Emit(v interface{}) error { ...@@ -215,3 +179,39 @@ func (re *chanResponseEmitter) Emit(v interface{}) error {
return nil return nil
} }
func (re *chanResponseEmitter) SetError(v interface{}, errType cmdkit.ErrorType) {
err := re.Emit(&cmdkit.Error{Message: fmt.Sprint(v), Code: errType})
if err != nil {
panic(err)
}
}
func (re *chanResponseEmitter) SetLength(l uint64) {
// don't change value after emitting
if re.emitted {
return
}
*re.length = l
}
func (re *chanResponseEmitter) Head() Head {
<-re.wait
return Head{
Len: *re.length,
Err: *re.err,
}
}
func (re *chanResponseEmitter) Close() error {
if re.ch == nil {
return nil
}
close(re.ch)
re.ch = nil
return nil
}
...@@ -132,12 +132,15 @@ func (re *responseEmitter) Head() cmds.Head { ...@@ -132,12 +132,15 @@ func (re *responseEmitter) Head() cmds.Head {
} }
func (re *responseEmitter) Emit(v interface{}) error { func (re *responseEmitter) Emit(v interface{}) error {
// unwrap
if val, ok := v.(cmds.Single); ok {
v = val.Value
}
if ch, ok := v.(chan interface{}); ok { if ch, ok := v.(chan interface{}); ok {
v = (<-chan interface{})(ch) v = (<-chan interface{})(ch)
} }
log.Debugf("%T:%v, enc:%p", v, v, re.enc)
// TODO find a better solution for this. // TODO find a better solution for this.
// Idea: use the actual cmd.Type and not *cmd.Type // Idea: use the actual cmd.Type and not *cmd.Type
// would need to fix all commands though // would need to fix all commands though
......
...@@ -102,16 +102,18 @@ func (c *Command) Call(req Request, re ResponseEmitter) (err error) { ...@@ -102,16 +102,18 @@ func (c *Command) Call(req Request, re ResponseEmitter) (err error) {
} }
defer func() { defer func() {
// catch panics (esp. from re.SetError) /*
if v := recover(); v != nil { // catch panics (esp. from re.SetError)
// if they are errors if v := recover(); v != nil {
if e, ok := v.(error); ok { // if they are errors
// use them as return error if e, ok := v.(error); ok {
err = e // use them as return error
err = e
}
// otherwise keep panicking.
panic(v)
} }
// otherwise keep panicking. */
panic(v)
}
}() }()
cmd.Run(req, re) cmd.Run(req, re)
......
...@@ -8,8 +8,8 @@ import ( ...@@ -8,8 +8,8 @@ import (
"time" "time"
"github.com/ipfs/go-ipfs-cmdkit" "github.com/ipfs/go-ipfs-cmdkit"
"gx/ipfs/QmQVvuDwXUGbtYmbmTcbLtGRYXnEbymaR2zEj38GVysqWe/go-ipfs-cmds" "gx/ipfs/QmdgLyCJYMFwNv5Qx3vMiTrrtAdWGTaL5G7xYwkB6CCgja/go-ipfs-cmds"
"gx/ipfs/QmQVvuDwXUGbtYmbmTcbLtGRYXnEbymaR2zEj38GVysqWe/go-ipfs-cmds/cli" "gx/ipfs/QmdgLyCJYMFwNv5Qx3vMiTrrtAdWGTaL5G7xYwkB6CCgja/go-ipfs-cmds/cli"
) )
// AddStatus describes the progress of the add operation // AddStatus describes the progress of the add operation
......
...@@ -5,8 +5,8 @@ import ( ...@@ -5,8 +5,8 @@ import (
"github.com/ipfs/go-ipfs-cmds/examples/adder" "github.com/ipfs/go-ipfs-cmds/examples/adder"
"gx/ipfs/QmQVvuDwXUGbtYmbmTcbLtGRYXnEbymaR2zEj38GVysqWe/go-ipfs-cmds" "gx/ipfs/QmdgLyCJYMFwNv5Qx3vMiTrrtAdWGTaL5G7xYwkB6CCgja/go-ipfs-cmds"
"gx/ipfs/QmQVvuDwXUGbtYmbmTcbLtGRYXnEbymaR2zEj38GVysqWe/go-ipfs-cmds/cli" "gx/ipfs/QmdgLyCJYMFwNv5Qx3vMiTrrtAdWGTaL5G7xYwkB6CCgja/go-ipfs-cmds/cli"
) )
func main() { func main() {
......
...@@ -6,9 +6,9 @@ import ( ...@@ -6,9 +6,9 @@ import (
"github.com/ipfs/go-ipfs-cmds/examples/adder" "github.com/ipfs/go-ipfs-cmds/examples/adder"
cmdkit "github.com/ipfs/go-ipfs-cmdkit" cmdkit "github.com/ipfs/go-ipfs-cmdkit"
cmds "gx/ipfs/QmQVvuDwXUGbtYmbmTcbLtGRYXnEbymaR2zEj38GVysqWe/go-ipfs-cmds" cmds "gx/ipfs/QmdgLyCJYMFwNv5Qx3vMiTrrtAdWGTaL5G7xYwkB6CCgja/go-ipfs-cmds"
cli "gx/ipfs/QmQVvuDwXUGbtYmbmTcbLtGRYXnEbymaR2zEj38GVysqWe/go-ipfs-cmds/cli" cli "gx/ipfs/QmdgLyCJYMFwNv5Qx3vMiTrrtAdWGTaL5G7xYwkB6CCgja/go-ipfs-cmds/cli"
http "gx/ipfs/QmQVvuDwXUGbtYmbmTcbLtGRYXnEbymaR2zEj38GVysqWe/go-ipfs-cmds/http" http "gx/ipfs/QmdgLyCJYMFwNv5Qx3vMiTrrtAdWGTaL5G7xYwkB6CCgja/go-ipfs-cmds/http"
) )
func main() { func main() {
......
...@@ -7,8 +7,8 @@ import ( ...@@ -7,8 +7,8 @@ import (
"github.com/ipfs/go-ipfs-cmds/examples/adder" "github.com/ipfs/go-ipfs-cmds/examples/adder"
cmds "gx/ipfs/QmQVvuDwXUGbtYmbmTcbLtGRYXnEbymaR2zEj38GVysqWe/go-ipfs-cmds" cmds "gx/ipfs/QmdgLyCJYMFwNv5Qx3vMiTrrtAdWGTaL5G7xYwkB6CCgja/go-ipfs-cmds"
http "gx/ipfs/QmQVvuDwXUGbtYmbmTcbLtGRYXnEbymaR2zEj38GVysqWe/go-ipfs-cmds/http" http "gx/ipfs/QmdgLyCJYMFwNv5Qx3vMiTrrtAdWGTaL5G7xYwkB6CCgja/go-ipfs-cmds/http"
) )
func main() { func main() {
......
...@@ -12,6 +12,14 @@ import ( ...@@ -12,6 +12,14 @@ import (
"github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/go-ipfs-cmds"
) )
var (
MIMEEncodings = map[string]cmds.EncodingType{
"application/json": cmds.JSON,
"application/xml": cmds.XML,
"text/plain": cmds.Text,
}
)
type Response struct { type Response struct {
length uint64 length uint64
err *cmdkit.Error err *cmdkit.Error
...@@ -21,6 +29,8 @@ type Response struct { ...@@ -21,6 +29,8 @@ type Response struct {
rr *responseReader rr *responseReader
dec cmds.Decoder dec cmds.Decoder
initErr *cmdkit.Error
} }
func (res *Response) Request() cmds.Request { func (res *Response) Request() cmds.Request {
...@@ -38,6 +48,13 @@ func (res *Response) Length() uint64 { ...@@ -38,6 +48,13 @@ func (res *Response) Length() uint64 {
} }
func (res *Response) RawNext() (interface{}, error) { func (res *Response) RawNext() (interface{}, error) {
if res.initErr != nil {
err := res.initErr
res.initErr = nil
return err, nil
}
// nil decoder means stream not chunks // nil decoder means stream not chunks
// but only do that once // but only do that once
if res.dec == nil { if res.dec == nil {
...@@ -65,33 +82,11 @@ func (res *Response) RawNext() (interface{}, error) { ...@@ -65,33 +82,11 @@ func (res *Response) RawNext() (interface{}, error) {
} }
func (res *Response) Next() (interface{}, error) { func (res *Response) Next() (interface{}, error) {
// nil decoder means stream not chunks v, err := res.RawNext()
// but only do that once
if res.dec == nil {
if res.rr == nil {
return nil, io.EOF
} else {
rr := res.rr
res.rr = nil
return rr, nil
}
}
a := &cmds.Any{}
a.Add(&cmdkit.Error{})
a.Add(res.req.Command().Type)
err := res.dec.Decode(a)
// last error was sent as value, now we get the same error from the headers. ignore and EOF!
if err != nil && res.err != nil && err.Error() == res.err.Error() {
err = io.EOF
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
v := a.Interface()
if err, ok := v.(cmdkit.Error); ok { if err, ok := v.(cmdkit.Error); ok {
v = &err v = &err
} }
...@@ -128,16 +123,11 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error ...@@ -128,16 +123,11 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
contentType := httpRes.Header.Get(contentTypeHeader) contentType := httpRes.Header.Get(contentTypeHeader)
contentType = strings.Split(contentType, ";")[0] contentType = strings.Split(contentType, ";")[0]
if len(httpRes.Header.Get(channelHeader)) > 0 { encType, found := MIMEEncodings[contentType]
encTypeStr, found, err := req.Option(cmdkit.EncShort).String() if found {
if err != nil { makeDec, ok := cmds.Decoders[encType]
return nil, err if ok {
} res.dec = makeDec(res.rr)
encType := cmds.EncodingType(encTypeStr)
if found {
res.dec = cmds.Decoders[encType](res.rr)
} }
} }
...@@ -150,7 +140,6 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error ...@@ -150,7 +140,6 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
// handle 404s // handle 404s
e.Message = "Command not found." e.Message = "Command not found."
e.Code = cmdkit.ErrClient e.Code = cmdkit.ErrClient
case contentType == plainText: case contentType == plainText:
// handle non-marshalled errors // handle non-marshalled errors
mes, err := ioutil.ReadAll(res.rr) mes, err := ioutil.ReadAll(res.rr)
...@@ -159,7 +148,6 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error ...@@ -159,7 +148,6 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
} }
e.Message = string(mes) e.Message = string(mes)
e.Code = cmdkit.ErrNormal e.Code = cmdkit.ErrNormal
default: default:
// handle marshalled errors // handle marshalled errors
err = res.dec.Decode(&e) err = res.dec.Decode(&e)
...@@ -168,9 +156,7 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error ...@@ -168,9 +156,7 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
} }
} }
res.err = e res.initErr = e
return res, nil
} }
return res, nil return res, nil
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"os"
"strconv" "strconv"
"sync" "sync"
...@@ -77,6 +78,11 @@ func (re *responseEmitter) Emit(value interface{}) error { ...@@ -77,6 +78,11 @@ func (re *responseEmitter) Emit(value interface{}) error {
re.once.Do(func() { re.preamble(value) }) re.once.Do(func() { re.preamble(value) })
if single, ok := value.(cmds.Single); ok {
value = single.Value
defer re.Close()
}
if re.w == nil { if re.w == nil {
return fmt.Errorf("connection already closed / custom - http.respem - TODO") return fmt.Errorf("connection already closed / custom - http.respem - TODO")
} }
...@@ -107,9 +113,6 @@ func (re *responseEmitter) Emit(value interface{}) error { ...@@ -107,9 +113,6 @@ func (re *responseEmitter) Emit(value interface{}) error {
} else { } else {
err = re.enc.Encode(value) err = re.enc.Encode(value)
} }
case cmds.Single:
defer re.Close()
err = re.enc.Encode(value)
default: default:
err = re.enc.Encode(value) err = re.enc.Encode(value)
} }
...@@ -154,25 +157,37 @@ func (re *responseEmitter) Flush() { ...@@ -154,25 +157,37 @@ func (re *responseEmitter) Flush() {
} }
func (re *responseEmitter) preamble(value interface{}) { func (re *responseEmitter) preamble(value interface{}) {
fmt.Fprintf(os.Stderr, "preamble(%#v)\n", value)
h := re.w.Header() h := re.w.Header()
// Expose our agent to allow identification // Expose our agent to allow identification
h.Set("Server", "go-ipfs/"+config.CurrentVersionNumber) h.Set("Server", "go-ipfs/"+config.CurrentVersionNumber)
status := http.StatusOK status := http.StatusOK
// unpack value if it needs special treatment in the type switch below
if s, isSingle := value.(cmds.Single); isSingle {
if err, isErr := s.Value.(cmdkit.Error); isErr {
value = &err
}
if err, isErr := s.Value.(*cmdkit.Error); isErr {
value = err
}
}
var mime string
switch v := value.(type) { switch v := value.(type) {
case *cmdkit.Error: case *cmdkit.Error:
h.Set(channelHeader, "1") err := v
if err.Code == cmdkit.ErrClient {
status = http.StatusBadRequest
} else {
status = http.StatusInternalServerError
}
// if this is not a head request, the error will be sent as a trailer or as a value // if this is not a head request, the error will be sent as a trailer or as a value
if re.method == "HEAD" { if re.method == "HEAD" {
err := v
if err.Code == cmdkit.ErrClient {
status = http.StatusBadRequest
} else {
status = http.StatusInternalServerError
}
http.Error(re.w, err.Error(), status) http.Error(re.w, err.Error(), status)
re.w = nil re.w = nil
...@@ -182,6 +197,11 @@ func (re *responseEmitter) preamble(value interface{}) { ...@@ -182,6 +197,11 @@ func (re *responseEmitter) preamble(value interface{}) {
// set streams output type to text to avoid issues with browsers rendering // set streams output type to text to avoid issues with browsers rendering
// html pages on priveleged api ports // html pages on priveleged api ports
h.Set(streamHeader, "1") h.Set(streamHeader, "1")
mime = "text/plain"
case cmds.Single:
case nil:
h.Set(channelHeader, "1")
default: default:
h.Set(channelHeader, "1") h.Set(channelHeader, "1")
} }
...@@ -189,12 +209,15 @@ func (re *responseEmitter) preamble(value interface{}) { ...@@ -189,12 +209,15 @@ func (re *responseEmitter) preamble(value interface{}) {
// Set up our potential trailer // Set up our potential trailer
h.Set("Trailer", StreamErrHeader) h.Set("Trailer", StreamErrHeader)
// lookup mime type from map
mime := mimeTypes[re.encType]
// catch-all, set to text as default
if mime == "" { if mime == "" {
mime = "text/plain" var ok bool
// lookup mime type from map
mime, ok = mimeTypes[re.encType]
if !ok {
// catch-all, set to text as default
mime = "text/plain"
}
} }
h.Set(contentTypeHeader, mime) h.Set(contentTypeHeader, mime)
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"reflect"
"sync" "sync"
"github.com/ipfs/go-ipfs-cmdkit" "github.com/ipfs/go-ipfs-cmdkit"
...@@ -197,6 +198,13 @@ func (r *fakeResponse) Error() *cmdkit.Error { ...@@ -197,6 +198,13 @@ func (r *fakeResponse) Error() *cmdkit.Error {
// SetOutput sets the output variable to the passed value // SetOutput sets the output variable to the passed value
func (r *fakeResponse) SetOutput(v interface{}) { func (r *fakeResponse) SetOutput(v interface{}) {
t := reflect.TypeOf(v)
_, isReader := v.(io.Reader)
if t != nil && t.Kind() != reflect.Chan && !isReader {
v = Single{v}
}
r.out = v r.out = v
r.once.Do(func() { close(r.wait) }) r.once.Do(func() { close(r.wait) })
} }
......
...@@ -34,6 +34,6 @@ ...@@ -34,6 +34,6 @@
"language": "go", "language": "go",
"license": "MIT", "license": "MIT",
"name": "go-ipfs-cmds", "name": "go-ipfs-cmds",
"version": "0.4.5" "version": "0.4.7"
} }
package cmds package cmds
import ( import (
"fmt"
"io" "io"
"github.com/ipfs/go-ipfs-cmdkit" "github.com/ipfs/go-ipfs-cmdkit"
...@@ -12,6 +13,14 @@ type Single struct { ...@@ -12,6 +13,14 @@ type Single struct {
Value interface{} Value interface{}
} }
func (s Single) String() string {
return fmt.Sprintf("Single{%v}", s.Value)
}
func (s Single) GoString() string {
return fmt.Sprintf("Single{%#v}", s.Value)
}
// EmitOnce is a helper that emits a value wrapped in Single, to signal that this will be the only value sent. // EmitOnce is a helper that emits a value wrapped in Single, to signal that this will be the only value sent.
func EmitOnce(re ResponseEmitter, v interface{}) error { func EmitOnce(re ResponseEmitter, v interface{}) error {
return re.Emit(Single{v}) return re.Emit(Single{v})
......
...@@ -12,23 +12,23 @@ func TestSingle_1(t *testing.T) { ...@@ -12,23 +12,23 @@ func TestSingle_1(t *testing.T) {
} }
re, res := NewChanResponsePair(req) re, res := NewChanResponsePair(req)
go func() { go func() {
if err := EmitOnce(re, "test"); err != nil { if err := EmitOnce(re, "test"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
}() }()
v, err := res.Next() v, err := res.Next()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if str, ok := v.(string); !ok || str != "test" { if str, ok := v.(string); !ok || str != "test" {
t.Fatalf("expected %#v, got %#v", "foo", str) t.Fatalf("expected %#v, got %#v", "foo", str)
} }
if _, err = res.Next(); err != io.EOF { if _, err = res.Next(); err != io.EOF {
t.Fatalf("expected %#v, got %#v", io.EOF, err) t.Fatalf("expected %#v, got %#v", io.EOF, err)
} }
} }
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment