writer.go 3.37 KB
Newer Older
Jan Winkelmann's avatar
Jan Winkelmann committed
1 2 3
package cmds

import (
4
	"encoding/json"
Jan Winkelmann's avatar
Jan Winkelmann committed
5 6 7
	"fmt"
	"io"
	"reflect"
8
	"sync"
9

keks's avatar
keks committed
10
	"github.com/ipfs/go-ipfs-cmdkit"
Jan Winkelmann's avatar
Jan Winkelmann committed
11 12
)

Jan Winkelmann's avatar
Jan Winkelmann committed
13
func NewWriterResponseEmitter(w io.WriteCloser, req Request, enc func(Request) func(io.Writer) Encoder) *WriterResponseEmitter {
Jan Winkelmann's avatar
Jan Winkelmann committed
14
	re := &WriterResponseEmitter{
15 16
		w:   w,
		c:   w,
Jan Winkelmann's avatar
Jan Winkelmann committed
17
		req: req,
Jan Winkelmann's avatar
Jan Winkelmann committed
18
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
19 20 21 22 23 24

	if enc != nil {
		re.enc = enc(req)(w)
	}

	return re
Jan Winkelmann's avatar
Jan Winkelmann committed
25 26 27
}

func NewReaderResponse(r io.Reader, encType EncodingType, req Request) Response {
28 29
	emitted := make(chan struct{})

Jan Winkelmann's avatar
Jan Winkelmann committed
30 31 32 33 34
	return &readerResponse{
		req:     req,
		r:       r,
		encType: encType,
		dec:     Decoders[encType](r),
35
		emitted: emitted,
Jan Winkelmann's avatar
Jan Winkelmann committed
36 37 38 39 40 41 42 43 44 45 46
	}
}

type readerResponse struct {
	r       io.Reader
	encType EncodingType
	dec     Decoder

	req Request

	length uint64
47
	err    *cmdkit.Error
Jan Winkelmann's avatar
Jan Winkelmann committed
48

49 50
	emitted chan struct{}
	once    sync.Once
Jan Winkelmann's avatar
Jan Winkelmann committed
51 52
}

Jan Winkelmann's avatar
Jan Winkelmann committed
53 54
func (r *readerResponse) Request() Request {
	return r.req
Jan Winkelmann's avatar
Jan Winkelmann committed
55 56
}

57
func (r *readerResponse) Error() *cmdkit.Error {
58 59
	<-r.emitted

Jan Winkelmann's avatar
Jan Winkelmann committed
60 61 62 63
	return r.err
}

func (r *readerResponse) Length() uint64 {
64 65
	<-r.emitted

Jan Winkelmann's avatar
Jan Winkelmann committed
66 67 68
	return r.length
}

69
func (r *readerResponse) RawNext() (interface{}, error) {
keks's avatar
keks committed
70 71
	m := &MaybeError{Value: r.req.Command().Type}
	err := r.dec.Decode(m)
72 73 74 75 76 77
	if err != nil {
		return nil, err
	}

	r.once.Do(func() { close(r.emitted) })

keks's avatar
keks committed
78
	v := m.Get()
79 80 81
	return v, nil
}

Jan Winkelmann's avatar
Jan Winkelmann committed
82
func (r *readerResponse) Next() (interface{}, error) {
keks's avatar
keks committed
83
	v, err := r.RawNext()
84 85 86 87
	if err != nil {
		return nil, err
	}

88
	if err, ok := v.(cmdkit.Error); ok {
89
		v = &err
Jan Winkelmann's avatar
Jan Winkelmann committed
90
	}
91 92 93 94

	switch val := v.(type) {
	case *cmdkit.Error:
		r.err = val
Jan Winkelmann's avatar
Jan Winkelmann committed
95
		return nil, ErrRcvdError
96 97 98 99
	case Single:
		return val.Value, nil
	default:
		return v, nil
100
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
101 102
}

103
type WriterResponseEmitter struct {
Jan Winkelmann's avatar
Jan Winkelmann committed
104
	// TODO maybe make those public?
105 106 107
	w   io.Writer
	c   io.Closer
	enc Encoder
Jan Winkelmann's avatar
Jan Winkelmann committed
108
	req Request
Jan Winkelmann's avatar
Jan Winkelmann committed
109 110

	length *uint64
111
	err    *cmdkit.Error
Jan Winkelmann's avatar
Jan Winkelmann committed
112 113 114 115

	emitted bool
}

Jan Winkelmann's avatar
Jan Winkelmann committed
116 117 118 119
func (re *WriterResponseEmitter) SetEncoder(mkEnc func(io.Writer) Encoder) {
	re.enc = mkEnc(re.w)
}

120 121
func (re *WriterResponseEmitter) SetError(v interface{}, errType cmdkit.ErrorType) {
	err := re.Emit(&cmdkit.Error{Message: fmt.Sprint(v), Code: errType})
122
	if err != nil {
123
		panic(err)
124
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
125 126
}

127
func (re *WriterResponseEmitter) SetLength(length uint64) {
Jan Winkelmann's avatar
Jan Winkelmann committed
128 129 130 131 132 133 134
	if re.emitted {
		return
	}

	*re.length = length
}

135 136 137 138 139 140 141 142 143 144 145 146
func (re *WriterResponseEmitter) Close() error {
	return re.c.Close()
}

func (re *WriterResponseEmitter) Head() Head {
	return Head{
		Len: *re.length,
		Err: re.err,
	}
}

func (re *WriterResponseEmitter) Emit(v interface{}) error {
147 148 149
	if ch, ok := v.(chan interface{}); ok {
		v = (<-chan interface{})(ch)
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
150

151 152 153 154 155 156 157 158
	if ch, isChan := v.(<-chan interface{}); isChan {
		for v = range ch {
			err := re.Emit(v)
			if err != nil {
				return err
			}
		}
		return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
159 160
	}

161 162
	re.emitted = true

163 164 165 166
	if _, ok := v.(Single); ok {
		defer re.Close()
	}

167
	return re.enc.Encode(v)
Jan Winkelmann's avatar
Jan Winkelmann committed
168 169
}

keks's avatar
keks committed
170 171 172
type MaybeError struct {
	Value interface{} // needs to be a pointer
	Error cmdkit.Error
173

keks's avatar
keks committed
174
	isError bool
175 176
}

keks's avatar
keks committed
177 178 179
func (m *MaybeError) Get() interface{} {
	if m.isError {
		return m.Error
180
	}
keks's avatar
keks committed
181
	return m.Value
182 183
}

keks's avatar
keks committed
184 185 186 187 188
func (m *MaybeError) UnmarshalJSON(data []byte) error {
	err := json.Unmarshal(data, &m.Error)
	if err == nil {
		m.isError = true
		return nil
189 190
	}

191 192 193 194 195 196 197 198 199 200 201
	if m.Value != nil {
		// make sure we are working with a pointer here
		v := reflect.ValueOf(m.Value)
		if v.Kind() != reflect.Ptr {
			m.Value = reflect.New(v.Type()).Interface()
		}

		err = json.Unmarshal(data, m.Value)
	} else {
		// let the json decoder decode into whatever it finds appropriate
		err = json.Unmarshal(data, &m.Value)
keks's avatar
keks committed
202
	}
203

204
	return err
205
}