client.go 7.03 KB
Newer Older
1 2 3
package http

import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
7
	"fmt"
8
	"io"
9
	"net/http"
10
	"net/url"
11
	"reflect"
12
	"strconv"
13 14
	"strings"

15 16
	cmds "github.com/ipfs/go-ipfs/commands"
	config "github.com/ipfs/go-ipfs/repo/config"
17 18

	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
19 20
)

21
const (
22
	ApiUrlFormat = "http://%s%s/%s?%s"
23 24
	ApiPath      = "/api/v0" // TODO: make configurable
)
25

26 27 28 29 30 31 32
// Client is the commands HTTP client interface.
type Client interface {
	Send(req cmds.Request) (cmds.Response, error)
}

type client struct {
	serverAddress string
33
	httpClient    http.Client
34
}
35

36
func NewClient(address string) Client {
37 38 39 40 41 42 43 44
	return &client{
		serverAddress: address,
		httpClient: http.Client{
			Transport: &http.Transport{
				DisableKeepAlives: true,
			},
		},
	}
45
}
46

47
func (c *client) Send(req cmds.Request) (cmds.Response, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48

49 50 51 52 53 54 55 56
	if req.Context() == nil {
		log.Warningf("no context set in request")
		err := req.SetRootContext(context.TODO())
		if err != nil {
			return nil, err
		}
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
57 58 59 60 61 62 63
	// save user-provided encoding
	previousUserProvidedEncoding, found, err := req.Option(cmds.EncShort).String()
	if err != nil {
		return nil, err
	}

	// override with json to send to server
64
	req.SetOption(cmds.EncShort, cmds.JSON)
65

66 67 68
	// stream channel output
	req.SetOption(cmds.ChanOpt, "true")

69
	query, err := getQuery(req)
70 71 72 73
	if err != nil {
		return nil, err
	}

74
	var fileReader *MultiFileReader
75 76
	var reader io.Reader

77
	if req.Files() != nil {
78
		fileReader = NewMultiFileReader(req.Files(), true)
79 80 81 82 83
		reader = fileReader
	} else {
		// if we have no file data, use an empty Reader
		// (http.NewRequest panics when a nil Reader is used)
		reader = strings.NewReader("")
84 85
	}

86 87 88
	path := strings.Join(req.Path(), "/")
	url := fmt.Sprintf(ApiUrlFormat, c.serverAddress, ApiPath, path, query)

89
	httpReq, err := http.NewRequest("POST", url, reader)
90 91 92 93 94 95
	if err != nil {
		return nil, err
	}

	// TODO extract string consts?
	if fileReader != nil {
96 97
		httpReq.Header.Set(contentTypeHeader, "multipart/form-data; boundary="+fileReader.Boundary())
		httpReq.Header.Set(contentDispHeader, "form-data: name=\"files\"")
98
	} else {
99
		httpReq.Header.Set(contentTypeHeader, applicationOctetStream)
100 101
	}
	version := config.CurrentVersionNumber
102
	httpReq.Header.Set(uaHeader, fmt.Sprintf("/go-ipfs/%s/", version))
103

104 105
	ec := make(chan error, 1)
	rc := make(chan cmds.Response, 1)
Jeromy's avatar
Jeromy committed
106
	dc := req.Context().Done()
107 108

	go func() {
109
		httpRes, err := c.httpClient.Do(httpReq)
110 111 112 113
		if err != nil {
			ec <- err
			return
		}
114

115 116 117 118 119 120
		// using the overridden JSON encoding in request
		res, err := getResponse(httpRes, req)
		if err != nil {
			ec <- err
			return
		}
121

122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
		rc <- res
	}()

	for {
		select {
		case <-dc:
			log.Debug("Context cancelled, cancelling HTTP request...")
			tr := http.DefaultTransport.(*http.Transport)
			tr.CancelRequest(httpReq)
			dc = nil // Wait for ec or rc
		case err := <-ec:
			return nil, err
		case res := <-rc:
			if found && len(previousUserProvidedEncoding) > 0 {
				// reset to user provided encoding after sending request
				// NB: if user has provided an encoding but it is the empty string,
				// still leave it as JSON.
				req.SetOption(cmds.EncShort, previousUserProvidedEncoding)
			}
			return res, nil
		}
143 144 145
	}
}

146
func getQuery(req cmds.Request) (string, error) {
147
	query := url.Values{}
148
	for k, v := range req.Options() {
149
		str := fmt.Sprintf("%v", v)
150
		query.Set(k, str)
151
	}
152 153

	args := req.Arguments()
154 155
	argDefs := req.Command().Arguments

156 157 158 159 160 161 162 163
	argDefIndex := 0

	for _, arg := range args {
		argDef := argDefs[argDefIndex]
		// skip ArgFiles
		for argDef.Type == cmds.ArgFile {
			argDefIndex++
			argDef = argDefs[argDefIndex]
164 165
		}

166 167 168 169
		query.Add("arg", arg)

		if len(argDefs) > argDefIndex+1 {
			argDefIndex++
170
		}
171
	}
172

173
	return query.Encode(), nil
174
}
175

176 177 178
// getResponse decodes a http.Response to create a cmds.Response
func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error) {
	var err error
179 180
	res := cmds.NewResponse(req)

181
	contentType := httpRes.Header.Get(contentTypeHeader)
182 183
	contentType = strings.Split(contentType, ";")[0]

184 185 186 187 188 189 190 191 192
	lengthHeader := httpRes.Header.Get(contentLengthHeader)
	if len(lengthHeader) > 0 {
		length, err := strconv.ParseUint(lengthHeader, 10, 64)
		if err != nil {
			return nil, err
		}
		res.SetLength(length)
	}

193 194
	rr := &httpResponseReader{httpRes}
	res.SetCloser(rr)
195

196
	if contentType != applicationJson {
Jeromy's avatar
Jeromy committed
197
		// for all non json output types, just stream back the output
198
		res.SetOutput(rr)
199
		return res, nil
200 201 202 203

	} else if len(httpRes.Header.Get(channelHeader)) > 0 {
		// if output is coming from a channel, decode each chunk
		outChan := make(chan interface{})
Jeromy's avatar
Jeromy committed
204

205
		go readStreamedJson(req, rr, outChan)
206

207
		res.SetOutput((<-chan interface{})(outChan))
208
		return res, nil
209 210
	}

211
	dec := json.NewDecoder(rr)
212

Jeromy's avatar
Jeromy committed
213
	// If we ran into an error
214 215
	if httpRes.StatusCode >= http.StatusBadRequest {
		e := cmds.Error{}
216

Jeromy's avatar
Jeromy committed
217 218
		switch {
		case httpRes.StatusCode == http.StatusNotFound:
219 220 221 222
			// handle 404s
			e.Message = "Command not found."
			e.Code = cmds.ErrClient

223
		case contentType == plainText:
224 225 226 227 228 229
			// handle non-marshalled errors
			buf := bytes.NewBuffer(nil)
			io.Copy(buf, httpRes.Body)
			e.Message = string(buf.Bytes())
			e.Code = cmds.ErrNormal

Jeromy's avatar
Jeromy committed
230
		default:
231 232 233 234 235
			// handle marshalled errors
			err = dec.Decode(&e)
			if err != nil {
				return nil, err
			}
236 237 238 239
		}

		res.SetError(e, e.Code)

Jeromy's avatar
Jeromy committed
240 241 242 243 244 245 246 247 248 249 250 251 252 253
		return res, nil
	}

	outputType := reflect.TypeOf(req.Command().Type)
	v, err := decodeTypedVal(outputType, dec)
	if err != nil && err != io.EOF {
		return nil, err
	}

	res.SetOutput(v)

	return res, nil
}

Jeromy's avatar
Jeromy committed
254 255
// read json objects off of the given stream, and write the objects out to
// the 'out' channel
256
func readStreamedJson(req cmds.Request, rr io.Reader, out chan<- interface{}) {
Jeromy's avatar
Jeromy committed
257
	defer close(out)
258
	dec := json.NewDecoder(rr)
Jeromy's avatar
Jeromy committed
259 260 261 262 263 264 265
	outputType := reflect.TypeOf(req.Command().Type)

	ctx := req.Context()

	for {
		v, err := decodeTypedVal(outputType, dec)
		if err != nil {
266
			// reading on a closed response body is as good as an io.EOF here
Jeromy's avatar
Jeromy committed
267 268 269 270
			if !(strings.Contains(err.Error(), "read on closed response body") || err == io.EOF) {
				log.Error(err)
			}
			return
271
		}
Jeromy's avatar
Jeromy committed
272 273 274 275 276

		select {
		case <-ctx.Done():
			return
		case out <- v:
277
		}
278
	}
Jeromy's avatar
Jeromy committed
279
}
280

Jeromy's avatar
Jeromy committed
281 282
// decode a value of the given type, if the type is nil, attempt to decode into
// an interface{} anyways
Jeromy's avatar
Jeromy committed
283 284 285 286 287 288 289 290 291 292 293
func decodeTypedVal(t reflect.Type, dec *json.Decoder) (interface{}, error) {
	var v interface{}
	var err error
	if t != nil {
		v = reflect.New(t).Interface()
		err = dec.Decode(v)
	} else {
		err = dec.Decode(&v)
	}

	return v, err
294
}
295

Jeromy's avatar
Jeromy committed
296 297 298
// httpResponseReader reads from the response body, and checks for an error
// in the http trailer upon EOF, this error if present is returned instead
// of the EOF.
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
type httpResponseReader struct {
	resp *http.Response
}

func (r *httpResponseReader) Read(b []byte) (int, error) {
	n, err := r.resp.Body.Read(b)
	if err == io.EOF {
		_ = r.resp.Body.Close()
		trailerErr := r.checkError()
		if trailerErr != nil {
			return n, trailerErr
		}
	}
	return n, err
}

func (r *httpResponseReader) checkError() error {
	if e := r.resp.Trailer.Get(StreamErrHeader); e != "" {
		return errors.New(e)
	}
	return nil
}

func (r *httpResponseReader) Close() error {
	return r.resp.Body.Close()
}