responseemitter.go 4.24 KB
Newer Older
Jan Winkelmann's avatar
Jan Winkelmann committed
1 2 3
package cli

import (
4
	"errors"
Jan Winkelmann's avatar
Jan Winkelmann committed
5 6
	"fmt"
	"io"
Jan Winkelmann's avatar
Jan Winkelmann committed
7
	"os"
Jan Winkelmann's avatar
Jan Winkelmann committed
8
	"runtime/debug"
9
	"sync"
Jan Winkelmann's avatar
Jan Winkelmann committed
10

keks's avatar
keks committed
11
	"github.com/ipfs/go-ipfs-cmdkit"
keks's avatar
keks committed
12
	"github.com/ipfs/go-ipfs-cmds"
Jan Winkelmann's avatar
Jan Winkelmann committed
13 14
)

15 16
var _ ResponseEmitter = &responseEmitter{}

Jan Winkelmann's avatar
Jan Winkelmann committed
17 18 19 20
type ErrSet struct {
	error
}

21
func NewResponseEmitter(stdout, stderr io.Writer, enc func(*cmds.Request) func(io.Writer) cmds.Encoder, req *cmds.Request) (cmds.ResponseEmitter, <-chan int) {
22
	ch := make(chan int)
23
	encType := cmds.GetEncoding(req)
Jan Winkelmann's avatar
Jan Winkelmann committed
24

Jan Winkelmann's avatar
Jan Winkelmann committed
25
	if enc == nil {
26
		enc = func(*cmds.Request) func(io.Writer) cmds.Encoder {
Jan Winkelmann's avatar
Jan Winkelmann committed
27 28 29 30
			return func(io.Writer) cmds.Encoder {
				return nil
			}
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
31 32
	}

33
	return &responseEmitter{stdout: stdout, stderr: stderr, encType: encType, enc: enc(req)(stdout), ch: ch}, ch
Jan Winkelmann's avatar
Jan Winkelmann committed
34 35
}

36 37 38 39
// ResponseEmitter extends cmds.ResponseEmitter to give better control over the command line
type ResponseEmitter interface {
	cmds.ResponseEmitter

40 41
	Stdout() io.Writer
	Stderr() io.Writer
42 43 44
	Exit(int)
}

Jan Winkelmann's avatar
Jan Winkelmann committed
45
type responseEmitter struct {
46 47 48
	wLock  sync.Mutex
	stdout io.Writer
	stderr io.Writer
Jan Winkelmann's avatar
Jan Winkelmann committed
49

50
	length  uint64
51
	err     *cmdkit.Error
52 53
	enc     cmds.Encoder
	encType cmds.EncodingType
54 55
	exit    int
	closed  bool
Jan Winkelmann's avatar
Jan Winkelmann committed
56

57 58 59
	errOccurred bool

	ch chan<- int
Jan Winkelmann's avatar
Jan Winkelmann committed
60 61
}

keks's avatar
keks committed
62 63 64 65
func (re *responseEmitter) Type() cmds.PostRunType {
	return cmds.CLI
}

66 67 68 69 70
func (re *responseEmitter) SetLength(l uint64) {
	re.length = l
}

func (re *responseEmitter) SetEncoder(enc func(io.Writer) cmds.Encoder) {
71
	re.enc = enc(re.stdout)
72 73
}

74
func (re *responseEmitter) SetError(v interface{}, errType cmdkit.ErrorType) {
75 76
	re.errOccurred = true

77
	err := re.Emit(&cmdkit.Error{Message: fmt.Sprint(v), Code: errType})
78 79 80 81 82
	if err != nil {
		panic(err)
	}
}

83
func (re *responseEmitter) isClosed() bool {
84 85 86
	re.wLock.Lock()
	defer re.wLock.Unlock()

87 88 89 90 91
	return re.closed
}

func (re *responseEmitter) Close() error {
	if re.isClosed() {
92
		return errors.New("closing closed responseemitter")
93 94
	}

95 96
	log.Debugf("err=%v exit=%v\nStack:\n%s", re.err, re.exit, debug.Stack())

97 98 99 100
	re.wLock.Lock()
	defer re.wLock.Unlock()

	re.ch <- re.exit
101
	close(re.ch)
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119

	defer func() {
		re.stdout = nil
		re.stderr = nil
		re.closed = true
	}()

	if re.stdout == nil || re.stderr == nil {
		log.Warning("more than one call to RespEm.Close!")
	}

	if f, ok := re.stderr.(*os.File); ok {
		err := f.Sync()
		if err != nil {
			return err
		}
	}
	if f, ok := re.stdout.(*os.File); ok {
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
		err := f.Sync()
		if err != nil {
			return err
		}
	}

	return nil
}

// Head returns the current head.
// TODO: maybe it makes sense to make these pointers to shared memory?
//   might not be so clever though...concurrency and stuff
func (re *responseEmitter) Head() cmds.Head {
	return cmds.Head{
		Len: re.length,
		Err: re.err,
	}
}

Jan Winkelmann's avatar
Jan Winkelmann committed
139
func (re *responseEmitter) Emit(v interface{}) error {
140 141 142 143 144
	// unwrap
	if val, ok := v.(cmds.Single); ok {
		v = val.Value
	}

145 146 147 148
	if ch, ok := v.(chan interface{}); ok {
		v = (<-chan interface{})(ch)
	}

149 150 151 152 153 154 155 156 157 158
	// TODO find a better solution for this.
	// Idea: use the actual cmd.Type and not *cmd.Type
	// would need to fix all commands though
	switch c := v.(type) {
	case *string:
		v = *c
	case *int:
		v = *c
	}

159
	if ch, isChan := v.(<-chan interface{}); isChan {
160
		log.Debug("iterating over chan...", ch)
161 162 163 164 165 166 167 168 169
		for v = range ch {
			err := re.Emit(v)
			if err != nil {
				return err
			}
		}
		return nil
	}

170
	if re.isClosed() {
Jan Winkelmann's avatar
Jan Winkelmann committed
171 172 173
		return io.ErrClosedPipe
	}

174
	if err, ok := v.(cmdkit.Error); ok {
Jan Winkelmann's avatar
Jan Winkelmann committed
175 176 177 178
		log.Warningf("fixerr %s", debug.Stack())
		v = &err
	}

Jan Winkelmann's avatar
Jan Winkelmann committed
179 180 181
	var err error

	switch t := v.(type) {
182
	case *cmdkit.Error:
183
		_, err = fmt.Fprintln(re.stderr, "Error:", t.Message)
184
		if t.Code == cmdkit.ErrFatal {
185 186 187 188 189 190
			defer re.Close()
		}
		re.wLock.Lock()
		defer re.wLock.Unlock()
		re.exit = 1

191 192
		return nil

Jan Winkelmann's avatar
Jan Winkelmann committed
193
	case io.Reader:
194
		_, err = io.Copy(re.stdout, t)
195
		if err != nil {
196
			re.SetError(err, cmdkit.ErrNormal)
197 198
			err = nil
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
199 200 201 202
	default:
		if re.enc != nil {
			err = re.enc.Encode(v)
		} else {
203
			_, err = fmt.Fprintln(re.stdout, t)
Jan Winkelmann's avatar
Jan Winkelmann committed
204 205 206 207 208
		}
	}

	return err
}
209

210 211 212
// Stderr returns the ResponseWriter's stderr
func (re *responseEmitter) Stderr() io.Writer {
	return re.stderr
213 214
}

215 216 217
// Stdout returns the ResponseWriter's stdout
func (re *responseEmitter) Stdout() io.Writer {
	return re.stdout
218 219
}

220
// Exit sends code to the channel that was returned by NewResponseEmitter, so main() can pass it to os.Exit()
221
func (re *responseEmitter) Exit(code int) {
222 223 224 225 226
	defer re.Close()

	re.wLock.Lock()
	defer re.wLock.Unlock()
	re.exit = code
227
}