writer.go 3.76 KB
Newer Older
Jan Winkelmann's avatar
Jan Winkelmann committed
1 2 3
package cmds

import (
4
	"encoding/json"
keks's avatar
keks committed
5
	"errors"
Jan Winkelmann's avatar
Jan Winkelmann committed
6 7
	"io"
	"reflect"
8
	"sync"
Jan Winkelmann's avatar
Jan Winkelmann committed
9 10
)

Hector Sanjuan's avatar
Hector Sanjuan committed
11 12
// NewWriterResponseEmitter creates a response emitter that sends responses to
// the given WriterCloser.
13
func NewWriterResponseEmitter(w io.WriteCloser, req *Request) (ResponseEmitter, error) {
Steven Allen's avatar
Steven Allen committed
14
	_, valEnc, err := GetEncoder(req, w, Undefined)
15 16 17 18
	if err != nil {
		return nil, err
	}

keks's avatar
keks committed
19
	re := &writerResponseEmitter{
20 21
		w:   w,
		c:   w,
Jan Winkelmann's avatar
Jan Winkelmann committed
22
		req: req,
23
		enc: valEnc,
Jan Winkelmann's avatar
Jan Winkelmann committed
24
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
25

26
	return re, nil
Jan Winkelmann's avatar
Jan Winkelmann committed
27 28
}

Hector Sanjuan's avatar
Hector Sanjuan committed
29
// NewReaderResponse creates a Response from the given reader.
30
func NewReaderResponse(r io.Reader, req *Request) (Response, error) {
Steven Allen's avatar
Steven Allen committed
31 32 33
	encType := GetEncoding(req, Undefined)
	dec, ok := Decoders[encType]
	if !ok {
Steven Allen's avatar
Steven Allen committed
34
		return nil, Errorf(ErrClient, "unknown encoding: %s", encType)
35
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
36 37 38 39
	return &readerResponse{
		req:     req,
		r:       r,
		encType: encType,
Steven Allen's avatar
Steven Allen committed
40
		dec:     dec(r),
keks's avatar
keks committed
41
		emitted: make(chan struct{}),
42
	}, nil
Jan Winkelmann's avatar
Jan Winkelmann committed
43 44 45 46 47 48 49
}

type readerResponse struct {
	r       io.Reader
	encType EncodingType
	dec     Decoder

50
	req *Request
Jan Winkelmann's avatar
Jan Winkelmann committed
51 52

	length uint64
keks's avatar
keks committed
53
	err    error
Jan Winkelmann's avatar
Jan Winkelmann committed
54

55 56
	emitted chan struct{}
	once    sync.Once
Jan Winkelmann's avatar
Jan Winkelmann committed
57 58
}

59
func (r *readerResponse) Request() *Request {
Jan Winkelmann's avatar
Jan Winkelmann committed
60
	return r.req
Jan Winkelmann's avatar
Jan Winkelmann committed
61 62
}

Steven Allen's avatar
Steven Allen committed
63
func (r *readerResponse) Error() *Error {
64 65
	<-r.emitted

Steven Allen's avatar
Steven Allen committed
66
	if err, ok := r.err.(*Error); ok {
keks's avatar
keks committed
67 68 69
		return err
	}

Steven Allen's avatar
Steven Allen committed
70
	return &Error{Message: r.err.Error()}
Jan Winkelmann's avatar
Jan Winkelmann committed
71 72 73
}

func (r *readerResponse) Length() uint64 {
74 75
	<-r.emitted

Jan Winkelmann's avatar
Jan Winkelmann committed
76 77 78
	return r.length
}

79
func (r *readerResponse) Next() (interface{}, error) {
80
	m := &MaybeError{Value: r.req.Command.Type}
keks's avatar
keks committed
81
	err := r.dec.Decode(m)
82 83 84 85 86 87
	if err != nil {
		return nil, err
	}

	r.once.Do(func() { close(r.emitted) })

keks's avatar
keks committed
88 89
	v, err := m.Get()

Jeromy's avatar
Jeromy committed
90 91 92 93
	// because working with pointers to arrays is annoying
	if t := reflect.TypeOf(v); t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Slice {
		v = reflect.ValueOf(v).Elem().Interface()
	}
keks's avatar
keks committed
94
	return v, err
95 96
}

keks's avatar
keks committed
97
type writerResponseEmitter struct {
Jan Winkelmann's avatar
Jan Winkelmann committed
98
	// TODO maybe make those public?
99 100 101
	w   io.Writer
	c   io.Closer
	enc Encoder
102
	req *Request
Jan Winkelmann's avatar
Jan Winkelmann committed
103 104 105 106

	length *uint64

	emitted bool
107
	closed  bool
Jan Winkelmann's avatar
Jan Winkelmann committed
108 109
}

keks's avatar
keks committed
110
func (re *writerResponseEmitter) CloseWithError(err error) error {
111 112
	if re.closed {
		return ErrClosingClosedEmitter
keks's avatar
keks committed
113 114
	}

keks's avatar
keks committed
115 116
	if err == nil || err == io.EOF {
		return re.Close()
keks's avatar
keks committed
117 118
	}

keks's avatar
keks committed
119 120 121
	cwe, ok := re.c.(interface {
		CloseWithError(error) error
	})
122 123 124 125 126
	if ok {
		re.closed = true
		return cwe.CloseWithError(err)
	}

keks's avatar
keks committed
127
	return errors.New("provided closer does not support CloseWithError")
keks's avatar
keks committed
128 129
}

keks's avatar
keks committed
130
func (re *writerResponseEmitter) SetLength(length uint64) {
Jan Winkelmann's avatar
Jan Winkelmann committed
131 132 133 134 135 136 137
	if re.emitted {
		return
	}

	*re.length = length
}

keks's avatar
keks committed
138
func (re *writerResponseEmitter) Close() error {
139 140 141 142 143
	if re.closed {
		return ErrClosingClosedEmitter
	}

	re.closed = true
144 145 146
	return re.c.Close()
}

keks's avatar
keks committed
147
func (re *writerResponseEmitter) Emit(v interface{}) error {
keks's avatar
keks committed
148 149 150 151 152 153
	// channel emission iteration
	if ch, ok := v.(chan interface{}); ok {
		v = (<-chan interface{})(ch)
	}
	if ch, isChan := v.(<-chan interface{}); isChan {
		return EmitChan(re, ch)
154 155
	}

156 157 158 159
	if re.closed {
		return ErrClosedEmitter
	}

160 161
	re.emitted = true

162 163 164 165 166 167
	var isSingle bool
	if s, ok := v.(Single); ok {
		v = s.Value
		isSingle = true
	}

168 169 170 171 172
	err := re.enc.Encode(v)
	if err != nil {
		return err
	}

173
	if isSingle {
174
		return re.Close()
175 176
	}

177
	return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
178 179
}

keks's avatar
keks committed
180 181
type MaybeError struct {
	Value interface{} // needs to be a pointer
Steven Allen's avatar
Steven Allen committed
182
	Error *Error
183

keks's avatar
keks committed
184
	isError bool
185 186
}

keks's avatar
keks committed
187
func (m *MaybeError) Get() (interface{}, error) {
keks's avatar
keks committed
188
	if m.isError {
keks's avatar
keks committed
189
		return nil, m.Error
190
	}
keks's avatar
keks committed
191
	return m.Value, nil
192 193
}

keks's avatar
keks committed
194
func (m *MaybeError) UnmarshalJSON(data []byte) error {
Steven Allen's avatar
Steven Allen committed
195
	var e Error
keks's avatar
keks committed
196
	err := json.Unmarshal(data, &e)
keks's avatar
keks committed
197 198
	if err == nil {
		m.isError = true
keks's avatar
keks committed
199
		m.Error = &e
keks's avatar
keks committed
200
		return nil
201 202
	}

203 204 205 206 207 208 209 210 211 212 213
	if m.Value != nil {
		// make sure we are working with a pointer here
		v := reflect.ValueOf(m.Value)
		if v.Kind() != reflect.Ptr {
			m.Value = reflect.New(v.Type()).Interface()
		}

		err = json.Unmarshal(data, m.Value)
	} else {
		// let the json decoder decode into whatever it finds appropriate
		err = json.Unmarshal(data, &m.Value)
keks's avatar
keks committed
214
	}
215

216
	return err
217
}