client.go 7.12 KB
Newer Older
1 2 3
package http

import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
7
	"fmt"
8
	"io"
9
	"net/http"
10
	"net/url"
11
	"reflect"
12
	"strconv"
13 14
	"strings"

15 16
	cmds "github.com/ipfs/go-ipfs/commands"
	config "github.com/ipfs/go-ipfs/repo/config"
17 18

	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
19 20
)

21
const (
22
	ApiUrlFormat = "http://%s%s/%s?%s"
23 24
	ApiPath      = "/api/v0" // TODO: make configurable
)
25

26 27 28 29 30 31 32 33
// Client is the commands HTTP client interface.
type Client interface {
	Send(req cmds.Request) (cmds.Response, error)
}

type client struct {
	serverAddress string
}
34

35 36 37
func NewClient(address string) Client {
	return &client{address}
}
38

39
func (c *client) Send(req cmds.Request) (cmds.Response, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40

41 42 43 44 45 46 47 48
	if req.Context() == nil {
		log.Warningf("no context set in request")
		err := req.SetRootContext(context.TODO())
		if err != nil {
			return nil, err
		}
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50 51 52 53 54 55
	// save user-provided encoding
	previousUserProvidedEncoding, found, err := req.Option(cmds.EncShort).String()
	if err != nil {
		return nil, err
	}

	// override with json to send to server
56
	req.SetOption(cmds.EncShort, cmds.JSON)
57

58 59 60
	// stream channel output
	req.SetOption(cmds.ChanOpt, "true")

61
	query, err := getQuery(req)
62 63 64 65
	if err != nil {
		return nil, err
	}

66
	var fileReader *MultiFileReader
67 68
	var reader io.Reader

69
	if req.Files() != nil {
70
		fileReader = NewMultiFileReader(req.Files(), true)
71 72 73 74 75
		reader = fileReader
	} else {
		// if we have no file data, use an empty Reader
		// (http.NewRequest panics when a nil Reader is used)
		reader = strings.NewReader("")
76 77
	}

78 79 80
	path := strings.Join(req.Path(), "/")
	url := fmt.Sprintf(ApiUrlFormat, c.serverAddress, ApiPath, path, query)

81
	httpReq, err := http.NewRequest("POST", url, reader)
82 83 84 85 86 87 88 89 90 91 92 93 94 95
	if err != nil {
		return nil, err
	}

	// TODO extract string consts?
	if fileReader != nil {
		httpReq.Header.Set("Content-Type", "multipart/form-data; boundary="+fileReader.Boundary())
		httpReq.Header.Set("Content-Disposition", "form-data: name=\"files\"")
	} else {
		httpReq.Header.Set("Content-Type", "application/octet-stream")
	}
	version := config.CurrentVersionNumber
	httpReq.Header.Set("User-Agent", fmt.Sprintf("/go-ipfs/%s/", version))

96 97
	ec := make(chan error, 1)
	rc := make(chan cmds.Response, 1)
Jeromy's avatar
Jeromy committed
98
	dc := req.Context().Done()
99 100 101 102 103 104 105

	go func() {
		httpRes, err := http.DefaultClient.Do(httpReq)
		if err != nil {
			ec <- err
			return
		}
106

107 108 109 110 111 112
		// using the overridden JSON encoding in request
		res, err := getResponse(httpRes, req)
		if err != nil {
			ec <- err
			return
		}
113

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
		rc <- res
	}()

	for {
		select {
		case <-dc:
			log.Debug("Context cancelled, cancelling HTTP request...")
			tr := http.DefaultTransport.(*http.Transport)
			tr.CancelRequest(httpReq)
			dc = nil // Wait for ec or rc
		case err := <-ec:
			return nil, err
		case res := <-rc:
			if found && len(previousUserProvidedEncoding) > 0 {
				// reset to user provided encoding after sending request
				// NB: if user has provided an encoding but it is the empty string,
				// still leave it as JSON.
				req.SetOption(cmds.EncShort, previousUserProvidedEncoding)
			}
			return res, nil
		}
135 136 137
	}
}

138
func getQuery(req cmds.Request) (string, error) {
139
	query := url.Values{}
140
	for k, v := range req.Options() {
141
		str := fmt.Sprintf("%v", v)
142
		query.Set(k, str)
143
	}
144 145

	args := req.Arguments()
146 147
	argDefs := req.Command().Arguments

148 149 150 151 152 153 154 155
	argDefIndex := 0

	for _, arg := range args {
		argDef := argDefs[argDefIndex]
		// skip ArgFiles
		for argDef.Type == cmds.ArgFile {
			argDefIndex++
			argDef = argDefs[argDefIndex]
156 157
		}

158 159 160 161
		query.Add("arg", arg)

		if len(argDefs) > argDefIndex+1 {
			argDefIndex++
162
		}
163
	}
164

165
	return query.Encode(), nil
166
}
167

168 169 170
// getResponse decodes a http.Response to create a cmds.Response
func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error) {
	var err error
171 172
	res := cmds.NewResponse(req)

173
	contentType := httpRes.Header.Get(contentTypeHeader)
174 175
	contentType = strings.Split(contentType, ";")[0]

176 177 178 179 180 181 182 183 184
	lengthHeader := httpRes.Header.Get(contentLengthHeader)
	if len(lengthHeader) > 0 {
		length, err := strconv.ParseUint(lengthHeader, 10, 64)
		if err != nil {
			return nil, err
		}
		res.SetLength(length)
	}

185 186
	res.SetCloser(httpRes.Body)

Jeromy's avatar
Jeromy committed
187 188
	if contentType != "application/json" {
		// for all non json output types, just stream back the output
189
		res.SetOutput(&httpResponseReader{httpRes})
190
		return res, nil
191 192 193 194

	} else if len(httpRes.Header.Get(channelHeader)) > 0 {
		// if output is coming from a channel, decode each chunk
		outChan := make(chan interface{})
Jeromy's avatar
Jeromy committed
195 196

		go readStreamedJson(req, httpRes, outChan)
197

198
		res.SetOutput((<-chan interface{})(outChan))
199
		return res, nil
200 201
	}

202
	dec := json.NewDecoder(&httpResponseReader{httpRes})
203

Jeromy's avatar
Jeromy committed
204
	// If we ran into an error
205 206
	if httpRes.StatusCode >= http.StatusBadRequest {
		e := cmds.Error{}
207

Jeromy's avatar
Jeromy committed
208 209
		switch {
		case httpRes.StatusCode == http.StatusNotFound:
210 211 212 213
			// handle 404s
			e.Message = "Command not found."
			e.Code = cmds.ErrClient

Jeromy's avatar
Jeromy committed
214
		case contentType == "text/plain":
215 216 217 218 219 220
			// handle non-marshalled errors
			buf := bytes.NewBuffer(nil)
			io.Copy(buf, httpRes.Body)
			e.Message = string(buf.Bytes())
			e.Code = cmds.ErrNormal

Jeromy's avatar
Jeromy committed
221
		default:
222 223 224 225 226
			// handle marshalled errors
			err = dec.Decode(&e)
			if err != nil {
				return nil, err
			}
227 228 229 230
		}

		res.SetError(e, e.Code)

Jeromy's avatar
Jeromy committed
231 232 233 234 235 236 237 238 239 240 241 242 243 244
		return res, nil
	}

	outputType := reflect.TypeOf(req.Command().Type)
	v, err := decodeTypedVal(outputType, dec)
	if err != nil && err != io.EOF {
		return nil, err
	}

	res.SetOutput(v)

	return res, nil
}

Jeromy's avatar
Jeromy committed
245 246
// read json objects off of the given stream, and write the objects out to
// the 'out' channel
Jeromy's avatar
Jeromy committed
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
func readStreamedJson(req cmds.Request, httpRes *http.Response, out chan<- interface{}) {
	defer close(out)
	dec := json.NewDecoder(&httpResponseReader{httpRes})
	outputType := reflect.TypeOf(req.Command().Type)

	ctx := req.Context()

	for {
		v, err := decodeTypedVal(outputType, dec)
		if err != nil {
			// since we are just looping reading on the response, the only way to
			// know we are 'done' is for the consumer to close the response body.
			// doing so doesnt throw an io.EOF, but we want to treat it like one.
			if !(strings.Contains(err.Error(), "read on closed response body") || err == io.EOF) {
				log.Error(err)
			}
			return
264
		}
Jeromy's avatar
Jeromy committed
265 266 267 268 269

		select {
		case <-ctx.Done():
			return
		case out <- v:
270
		}
Jeromy's avatar
Jeromy committed
271

272
	}
Jeromy's avatar
Jeromy committed
273
}
274

Jeromy's avatar
Jeromy committed
275 276
// decode a value of the given type, if the type is nil, attempt to decode into
// an interface{} anyways
Jeromy's avatar
Jeromy committed
277 278 279 280 281 282 283 284 285 286 287
func decodeTypedVal(t reflect.Type, dec *json.Decoder) (interface{}, error) {
	var v interface{}
	var err error
	if t != nil {
		v = reflect.New(t).Interface()
		err = dec.Decode(v)
	} else {
		err = dec.Decode(&v)
	}

	return v, err
288
}
289

Jeromy's avatar
Jeromy committed
290 291 292
// httpResponseReader reads from the response body, and checks for an error
// in the http trailer upon EOF, this error if present is returned instead
// of the EOF.
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
type httpResponseReader struct {
	resp *http.Response
}

func (r *httpResponseReader) Read(b []byte) (int, error) {
	n, err := r.resp.Body.Read(b)
	if err == io.EOF {
		_ = r.resp.Body.Close()
		trailerErr := r.checkError()
		if trailerErr != nil {
			return n, trailerErr
		}
	}
	return n, err
}

func (r *httpResponseReader) checkError() error {
	if e := r.resp.Trailer.Get(StreamErrHeader); e != "" {
		return errors.New(e)
	}
	return nil
}

func (r *httpResponseReader) Close() error {
	return r.resp.Body.Close()
}