writer.go 3.2 KB
Newer Older
Jan Winkelmann's avatar
Jan Winkelmann committed
1 2 3
package cmds

import (
4
	"encoding/json"
Jan Winkelmann's avatar
Jan Winkelmann committed
5 6 7
	"fmt"
	"io"
	"reflect"
8
	"sync"
9 10

	"github.com/ipfs/go-ipfs-cmds/cmdsutil"
Jan Winkelmann's avatar
Jan Winkelmann committed
11 12
)

13
var EmittedErr = fmt.Errorf("received an error")
Jan Winkelmann's avatar
Jan Winkelmann committed
14

Jan Winkelmann's avatar
Jan Winkelmann committed
15
func NewWriterResponseEmitter(w io.WriteCloser, res Response, enc func(io.Writer) Encoder) *WriterResponseEmitter {
16 17 18 19
	return &WriterResponseEmitter{
		w:   w,
		c:   w,
		enc: enc(w),
Jan Winkelmann's avatar
Jan Winkelmann committed
20
		req: res.Request(),
Jan Winkelmann's avatar
Jan Winkelmann committed
21 22 23 24
	}
}

func NewReaderResponse(r io.Reader, encType EncodingType, req Request) Response {
25 26
	emitted := make(chan struct{})

Jan Winkelmann's avatar
Jan Winkelmann committed
27 28 29 30 31
	return &readerResponse{
		req:     req,
		r:       r,
		encType: encType,
		dec:     Decoders[encType](r),
32
		emitted: emitted,
Jan Winkelmann's avatar
Jan Winkelmann committed
33 34 35 36 37 38 39 40 41 42 43 44
	}
}

type readerResponse struct {
	r       io.Reader
	encType EncodingType
	dec     Decoder

	req Request
	t   reflect.Type

	length uint64
45
	err    *cmdsutil.Error
Jan Winkelmann's avatar
Jan Winkelmann committed
46

47 48
	emitted chan struct{}
	once    sync.Once
Jan Winkelmann's avatar
Jan Winkelmann committed
49 50
}

Jan Winkelmann's avatar
Jan Winkelmann committed
51 52
func (r *readerResponse) Request() Request {
	return r.req
Jan Winkelmann's avatar
Jan Winkelmann committed
53 54
}

55
func (r *readerResponse) Error() *cmdsutil.Error {
56 57
	<-r.emitted

Jan Winkelmann's avatar
Jan Winkelmann committed
58 59 60 61
	return r.err
}

func (r *readerResponse) Length() uint64 {
62 63
	<-r.emitted

Jan Winkelmann's avatar
Jan Winkelmann committed
64 65 66 67
	return r.length
}

func (r *readerResponse) Next() (interface{}, error) {
68 69 70
	a := &Any{}
	a.Add(cmdsutil.Error{})
	a.Add(r.req.Command().Type)
Jan Winkelmann's avatar
Jan Winkelmann committed
71

72 73 74 75 76
	err := r.dec.Decode(a)
	if err != nil {
		return nil, err
	}

77 78
	r.once.Do(func() { close(r.emitted) })

79 80 81 82 83 84
	v := a.Interface()
	if err, ok := v.(error); ok {
		return err, EmittedErr
	}

	return v, nil
Jan Winkelmann's avatar
Jan Winkelmann committed
85 86
}

87
type WriterResponseEmitter struct {
Jan Winkelmann's avatar
Jan Winkelmann committed
88
	// TODO maybe make those public?
89 90 91
	w   io.Writer
	c   io.Closer
	enc Encoder
Jan Winkelmann's avatar
Jan Winkelmann committed
92
	req Request
Jan Winkelmann's avatar
Jan Winkelmann committed
93 94

	length *uint64
95
	err    *cmdsutil.Error
Jan Winkelmann's avatar
Jan Winkelmann committed
96 97

	emitted bool
Jan Winkelmann's avatar
Jan Winkelmann committed
98
	tees    []ResponseEmitter
Jan Winkelmann's avatar
Jan Winkelmann committed
99 100
}

101
func (re *WriterResponseEmitter) SetError(err interface{}, code cmdsutil.ErrorType) {
102
	*re.err = cmdsutil.Error{Message: fmt.Sprint(err), Code: code}
Jan Winkelmann's avatar
Jan Winkelmann committed
103 104 105 106

	for _, re_ := range re.tees {
		re_.SetError(err, code)
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
107 108
}

109
func (re *WriterResponseEmitter) SetLength(length uint64) {
Jan Winkelmann's avatar
Jan Winkelmann committed
110 111 112 113 114
	if re.emitted {
		return
	}

	*re.length = length
Jan Winkelmann's avatar
Jan Winkelmann committed
115 116 117 118

	for _, re_ := range re.tees {
		re_.SetLength(length)
	}
Jan Winkelmann's avatar
Jan Winkelmann committed
119 120
}

121 122 123 124 125 126 127 128 129 130 131 132
func (re *WriterResponseEmitter) Close() error {
	return re.c.Close()
}

func (re *WriterResponseEmitter) Head() Head {
	return Head{
		Len: *re.length,
		Err: re.err,
	}
}

func (re *WriterResponseEmitter) Emit(v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
133 134
	re.emitted = true

Jan Winkelmann's avatar
Jan Winkelmann committed
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
	err := re.enc.Encode(v)
	if err != nil {
		return err
	}

	for _, re_ := range re.tees {
		err = re_.Emit(v)
		if err != nil {
			return err
		}
	}

	return nil
}

func (re *WriterResponseEmitter) Tee(re_ ResponseEmitter) {
	re.tees = append(re.tees, re_)

	// TODO first check whether length and error have been set
	re_.SetLength(*re.length)
	re_.SetError(re.err.Message, re.err.Code)
Jan Winkelmann's avatar
Jan Winkelmann committed
156
}
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197

type Any struct {
	types []reflect.Type

	v interface{}
}

func (a *Any) UnmarshalJSON(data []byte) error {
	var (
		iv  interface{}
		err error
	)

	for _, t := range a.types {
		v := reflect.New(t)

		err = json.Unmarshal(data, v.Interface())
		if err == nil && v.Elem().Interface() != reflect.Zero(t).Interface() {
			a.v = v.Elem().Interface()
			return nil
		}
	}

	err = json.Unmarshal(data, &iv)
	a.v = iv

	return err
}

func (a *Any) Add(v interface{}) {
	t := reflect.TypeOf(v)
	if t.Kind() == reflect.Ptr || t.Kind() == reflect.Interface {
		t = t.Elem()
	}

	a.types = append(a.types, t)
}

func (a *Any) Interface() interface{} {
	return a.v
}