writer.go 3.56 KB
Newer Older
Jan Winkelmann's avatar
Jan Winkelmann committed
1 2 3
package cmds

import (
4
	"encoding/json"
Jan Winkelmann's avatar
Jan Winkelmann committed
5 6 7
	"fmt"
	"io"
	"reflect"
8
	"sync"
9

keks's avatar
keks committed
10
	"github.com/ipfs/go-ipfs-cmdkit"
Jan Winkelmann's avatar
Jan Winkelmann committed
11 12
)

13
func NewWriterResponseEmitter(w io.WriteCloser, req *Request, enc func(*Request) func(io.Writer) Encoder) *WriterResponseEmitter {
Jan Winkelmann's avatar
Jan Winkelmann committed
14
	re := &WriterResponseEmitter{
15 16
		w:   w,
		c:   w,
Jan Winkelmann's avatar
Jan Winkelmann committed
17
		req: req,
Jan Winkelmann's avatar
Jan Winkelmann committed
18
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
19 20 21 22 23 24

	if enc != nil {
		re.enc = enc(req)(w)
	}

	return re
Jan Winkelmann's avatar
Jan Winkelmann committed
25 26
}

27
func NewReaderResponse(r io.Reader, encType EncodingType, req *Request) Response {
28 29
	emitted := make(chan struct{})

Jan Winkelmann's avatar
Jan Winkelmann committed
30 31 32 33 34
	return &readerResponse{
		req:     req,
		r:       r,
		encType: encType,
		dec:     Decoders[encType](r),
35
		emitted: emitted,
Jan Winkelmann's avatar
Jan Winkelmann committed
36 37 38 39 40 41 42 43
	}
}

type readerResponse struct {
	r       io.Reader
	encType EncodingType
	dec     Decoder

44
	req *Request
Jan Winkelmann's avatar
Jan Winkelmann committed
45 46

	length uint64
47
	err    *cmdkit.Error
Jan Winkelmann's avatar
Jan Winkelmann committed
48

49 50
	emitted chan struct{}
	once    sync.Once
Jan Winkelmann's avatar
Jan Winkelmann committed
51 52
}

53
func (r *readerResponse) Request() *Request {
Jan Winkelmann's avatar
Jan Winkelmann committed
54
	return r.req
Jan Winkelmann's avatar
Jan Winkelmann committed
55 56
}

57
func (r *readerResponse) Error() *cmdkit.Error {
58 59
	<-r.emitted

Jan Winkelmann's avatar
Jan Winkelmann committed
60 61 62 63
	return r.err
}

func (r *readerResponse) Length() uint64 {
64 65
	<-r.emitted

Jan Winkelmann's avatar
Jan Winkelmann committed
66 67 68
	return r.length
}

69
func (r *readerResponse) RawNext() (interface{}, error) {
70
	m := &MaybeError{Value: r.req.Command.Type}
keks's avatar
keks committed
71
	err := r.dec.Decode(m)
72 73 74 75 76 77
	if err != nil {
		return nil, err
	}

	r.once.Do(func() { close(r.emitted) })

keks's avatar
keks committed
78
	v := m.Get()
Jeromy's avatar
Jeromy committed
79 80 81 82
	// because working with pointers to arrays is annoying
	if t := reflect.TypeOf(v); t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Slice {
		v = reflect.ValueOf(v).Elem().Interface()
	}
83 84 85
	return v, nil
}

Jan Winkelmann's avatar
Jan Winkelmann committed
86
func (r *readerResponse) Next() (interface{}, error) {
keks's avatar
keks committed
87
	v, err := r.RawNext()
88 89 90 91
	if err != nil {
		return nil, err
	}

92
	if err, ok := v.(cmdkit.Error); ok {
93
		v = &err
Jan Winkelmann's avatar
Jan Winkelmann committed
94
	}
95 96 97 98

	switch val := v.(type) {
	case *cmdkit.Error:
		r.err = val
Jan Winkelmann's avatar
Jan Winkelmann committed
99
		return nil, ErrRcvdError
100 101 102 103
	case Single:
		return val.Value, nil
	default:
		return v, nil
104
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
105 106
}

107
type WriterResponseEmitter struct {
Jan Winkelmann's avatar
Jan Winkelmann committed
108
	// TODO maybe make those public?
109 110 111
	w   io.Writer
	c   io.Closer
	enc Encoder
112
	req *Request
Jan Winkelmann's avatar
Jan Winkelmann committed
113 114

	length *uint64
115
	err    *cmdkit.Error
Jan Winkelmann's avatar
Jan Winkelmann committed
116 117 118 119

	emitted bool
}

Jan Winkelmann's avatar
Jan Winkelmann committed
120 121 122 123
func (re *WriterResponseEmitter) SetEncoder(mkEnc func(io.Writer) Encoder) {
	re.enc = mkEnc(re.w)
}

124 125
func (re *WriterResponseEmitter) SetError(v interface{}, errType cmdkit.ErrorType) {
	err := re.Emit(&cmdkit.Error{Message: fmt.Sprint(v), Code: errType})
126
	if err != nil {
127
		panic(err)
128
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
129 130
}

131
func (re *WriterResponseEmitter) SetLength(length uint64) {
Jan Winkelmann's avatar
Jan Winkelmann committed
132 133 134 135 136 137 138
	if re.emitted {
		return
	}

	*re.length = length
}

139 140 141 142 143 144 145 146 147 148 149 150
func (re *WriterResponseEmitter) Close() error {
	return re.c.Close()
}

func (re *WriterResponseEmitter) Head() Head {
	return Head{
		Len: *re.length,
		Err: re.err,
	}
}

func (re *WriterResponseEmitter) Emit(v interface{}) error {
151 152 153
	if ch, ok := v.(chan interface{}); ok {
		v = (<-chan interface{})(ch)
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
154

155 156 157 158 159 160 161 162
	if ch, isChan := v.(<-chan interface{}); isChan {
		for v = range ch {
			err := re.Emit(v)
			if err != nil {
				return err
			}
		}
		return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
163 164
	}

165 166
	re.emitted = true

167 168 169 170
	if _, ok := v.(Single); ok {
		defer re.Close()
	}

171
	return re.enc.Encode(v)
Jan Winkelmann's avatar
Jan Winkelmann committed
172 173
}

keks's avatar
keks committed
174 175 176
type MaybeError struct {
	Value interface{} // needs to be a pointer
	Error cmdkit.Error
177

keks's avatar
keks committed
178
	isError bool
179 180
}

keks's avatar
keks committed
181 182 183
func (m *MaybeError) Get() interface{} {
	if m.isError {
		return m.Error
184
	}
keks's avatar
keks committed
185
	return m.Value
186 187
}

keks's avatar
keks committed
188 189 190 191 192
func (m *MaybeError) UnmarshalJSON(data []byte) error {
	err := json.Unmarshal(data, &m.Error)
	if err == nil {
		m.isError = true
		return nil
193 194
	}

195 196 197 198 199 200 201 202 203 204 205
	if m.Value != nil {
		// make sure we are working with a pointer here
		v := reflect.ValueOf(m.Value)
		if v.Kind() != reflect.Ptr {
			m.Value = reflect.New(v.Type()).Interface()
		}

		err = json.Unmarshal(data, m.Value)
	} else {
		// let the json decoder decode into whatever it finds appropriate
		err = json.Unmarshal(data, &m.Value)
keks's avatar
keks committed
206
	}
207

208
	return err
209
}