writer.go 1.74 KB
Newer Older
Jan Winkelmann's avatar
Jan Winkelmann committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
package cmds

import (
	"fmt"
	"io"
	"reflect"
)

func NewPipeResponsePair(encType EncodingType, req Request) (ResponseEmitter, Response) {
	r, w := io.Pipe()

	res := NewReaderResponse(r, encType, req)
	re := NewWriterResponseEmitter(w, encType)

	return re, res
}

func NewWriterResponseEmitter(w io.WriteCloser, encType EncodingType) ResponseEmitter {
	return &writerResponseEmitter{
		w:       w,
		encType: encType,
		enc:     Encoders[encType](w),
	}
}

func NewReaderResponse(r io.Reader, encType EncodingType, req Request) Response {
	return &readerResponse{
		req:     req,
		r:       r,
		encType: encType,
		dec:     Decoders[encType](r),
		t:       reflect.TypeOf(req.Command().Type),
	}
}

type readerResponse struct {
	r       io.Reader
	encType EncodingType
	dec     Decoder

	req Request
	t   reflect.Type

	length uint64
	err    *Error
}

func (r *readerResponse) Request() *Request {
	return &r.req
}

func (r *readerResponse) Error() *Error {
	return r.err
}

func (r *readerResponse) Length() uint64 {
	return r.length
}

func (r *readerResponse) Next() (interface{}, error) {
	v := reflect.New(r.t).Interface()
	err := r.dec.Decode(&v)

	return v, err
}

type writerResponseEmitter struct {
	w       io.WriteCloser
	encType EncodingType
	enc     Encoder

	length *uint64
	err    *Error

	emitted bool
}

func (re *writerResponseEmitter) SetError(err interface{}, code ErrorType) {
	if re.emitted {
		return
	}

	*re.err = Error{Message: fmt.Sprint(err), Code: code}
}

func (re *writerResponseEmitter) SetLength(length uint64) {
	if re.emitted {
		return
	}

	*re.length = length
}

func (re *writerResponseEmitter) Close() error {
	return re.w.Close()
}

func (re *writerResponseEmitter) Emit(v interface{}) error {
	re.emitted = true

	return re.enc.Encode(v)
}