client.go 7.18 KB
Newer Older
1 2 3
package http

import (
4
	"encoding/json"
5
	"errors"
6
	"fmt"
7
	"io"
8
	"io/ioutil"
9
	"net/http"
10
	"net/url"
11
	"reflect"
12
	"strconv"
13 14
	"strings"

15
	cmds "github.com/ipfs/go-ipfs/commands"
16
	path "github.com/ipfs/go-ipfs/path"
17
	config "github.com/ipfs/go-ipfs/repo/config"
18 19

	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
20 21
)

22
const (
23
	ApiUrlFormat = "http://%s%s/%s?%s"
24 25
	ApiPath      = "/api/v0" // TODO: make configurable
)
26

Jeromy's avatar
Jeromy committed
27 28 29 30
var OptionSkipMap = map[string]bool{
	"api": true,
}

31 32 33 34 35 36 37
// Client is the commands HTTP client interface.
type Client interface {
	Send(req cmds.Request) (cmds.Response, error)
}

type client struct {
	serverAddress string
38
	httpClient    http.Client
39
}
40

41
func NewClient(address string) Client {
Jeromy's avatar
Jeromy committed
42 43 44
	// We cannot use the default transport because of a bug in go's connection reuse
	// code. It causes random failures in the connection including io.EOF and connection
	// refused on 'client.Do'
45 46 47 48 49 50 51 52
	return &client{
		serverAddress: address,
		httpClient: http.Client{
			Transport: &http.Transport{
				DisableKeepAlives: true,
			},
		},
	}
53
}
54

55
func (c *client) Send(req cmds.Request) (cmds.Response, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
56

57 58
	if req.Context() == nil {
		log.Warningf("no context set in request")
59
		if err := req.SetRootContext(context.TODO()); err != nil {
60 61 62 63
			return nil, err
		}
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
64 65 66 67 68 69 70
	// save user-provided encoding
	previousUserProvidedEncoding, found, err := req.Option(cmds.EncShort).String()
	if err != nil {
		return nil, err
	}

	// override with json to send to server
71
	req.SetOption(cmds.EncShort, cmds.JSON)
72

73 74 75
	// stream channel output
	req.SetOption(cmds.ChanOpt, "true")

76
	query, err := getQuery(req)
77 78 79 80
	if err != nil {
		return nil, err
	}

81
	var fileReader *MultiFileReader
82 83
	var reader io.Reader

84
	if req.Files() != nil {
85
		fileReader = NewMultiFileReader(req.Files(), true)
86
		reader = fileReader
87 88
	}

89 90
	pth := path.Join(req.Path())
	url := fmt.Sprintf(ApiUrlFormat, c.serverAddress, ApiPath, pth, query)
91

92
	httpReq, err := http.NewRequest("POST", url, reader)
93 94 95 96 97 98
	if err != nil {
		return nil, err
	}

	// TODO extract string consts?
	if fileReader != nil {
99 100
		httpReq.Header.Set(contentTypeHeader, "multipart/form-data; boundary="+fileReader.Boundary())
		httpReq.Header.Set(contentDispHeader, "form-data: name=\"files\"")
101
	} else {
102
		httpReq.Header.Set(contentTypeHeader, applicationOctetStream)
103
	}
rht's avatar
rht committed
104
	httpReq.Header.Set(uaHeader, config.ApiVersion)
105

106 107
	ec := make(chan error, 1)
	rc := make(chan cmds.Response, 1)
Jeromy's avatar
Jeromy committed
108
	dc := req.Context().Done()
109 110

	go func() {
111
		httpRes, err := c.httpClient.Do(httpReq)
112 113 114 115
		if err != nil {
			ec <- err
			return
		}
116

117 118 119 120 121 122
		// using the overridden JSON encoding in request
		res, err := getResponse(httpRes, req)
		if err != nil {
			ec <- err
			return
		}
123

124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
		rc <- res
	}()

	for {
		select {
		case <-dc:
			log.Debug("Context cancelled, cancelling HTTP request...")
			tr := http.DefaultTransport.(*http.Transport)
			tr.CancelRequest(httpReq)
			dc = nil // Wait for ec or rc
		case err := <-ec:
			return nil, err
		case res := <-rc:
			if found && len(previousUserProvidedEncoding) > 0 {
				// reset to user provided encoding after sending request
				// NB: if user has provided an encoding but it is the empty string,
				// still leave it as JSON.
				req.SetOption(cmds.EncShort, previousUserProvidedEncoding)
			}
			return res, nil
		}
145 146 147
	}
}

148
func getQuery(req cmds.Request) (string, error) {
149
	query := url.Values{}
150
	for k, v := range req.Options() {
Jeromy's avatar
Jeromy committed
151
		if OptionSkipMap[k] {
Jeromy's avatar
Jeromy committed
152 153
			continue
		}
154
		str := fmt.Sprintf("%v", v)
155
		query.Set(k, str)
156
	}
157 158

	args := req.Arguments()
159 160
	argDefs := req.Command().Arguments

161 162 163 164 165 166 167 168
	argDefIndex := 0

	for _, arg := range args {
		argDef := argDefs[argDefIndex]
		// skip ArgFiles
		for argDef.Type == cmds.ArgFile {
			argDefIndex++
			argDef = argDefs[argDefIndex]
169 170
		}

171 172 173 174
		query.Add("arg", arg)

		if len(argDefs) > argDefIndex+1 {
			argDefIndex++
175
		}
176
	}
177

178
	return query.Encode(), nil
179
}
180

181 182 183
// getResponse decodes a http.Response to create a cmds.Response
func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error) {
	var err error
184 185
	res := cmds.NewResponse(req)

186
	contentType := httpRes.Header.Get(contentTypeHeader)
187 188
	contentType = strings.Split(contentType, ";")[0]

189 190 191 192 193 194 195 196 197
	lengthHeader := httpRes.Header.Get(contentLengthHeader)
	if len(lengthHeader) > 0 {
		length, err := strconv.ParseUint(lengthHeader, 10, 64)
		if err != nil {
			return nil, err
		}
		res.SetLength(length)
	}

198 199
	rr := &httpResponseReader{httpRes}
	res.SetCloser(rr)
200

201
	if contentType != applicationJson {
Jeromy's avatar
Jeromy committed
202
		// for all non json output types, just stream back the output
203
		res.SetOutput(rr)
204
		return res, nil
205 206 207 208

	} else if len(httpRes.Header.Get(channelHeader)) > 0 {
		// if output is coming from a channel, decode each chunk
		outChan := make(chan interface{})
Jeromy's avatar
Jeromy committed
209

210
		go readStreamedJson(req, rr, outChan)
211

212
		res.SetOutput((<-chan interface{})(outChan))
213
		return res, nil
214 215
	}

216
	dec := json.NewDecoder(rr)
217

Jeromy's avatar
Jeromy committed
218
	// If we ran into an error
219 220
	if httpRes.StatusCode >= http.StatusBadRequest {
		e := cmds.Error{}
221

Jeromy's avatar
Jeromy committed
222 223
		switch {
		case httpRes.StatusCode == http.StatusNotFound:
224 225 226 227
			// handle 404s
			e.Message = "Command not found."
			e.Code = cmds.ErrClient

228
		case contentType == plainText:
229
			// handle non-marshalled errors
230 231 232 233 234
			mes, err := ioutil.ReadAll(rr)
			if err != nil {
				return nil, err
			}
			e.Message = string(mes)
235 236
			e.Code = cmds.ErrNormal

Jeromy's avatar
Jeromy committed
237
		default:
238 239 240 241 242
			// handle marshalled errors
			err = dec.Decode(&e)
			if err != nil {
				return nil, err
			}
243 244 245 246
		}

		res.SetError(e, e.Code)

Jeromy's avatar
Jeromy committed
247 248 249 250 251 252 253 254 255 256 257 258 259 260
		return res, nil
	}

	outputType := reflect.TypeOf(req.Command().Type)
	v, err := decodeTypedVal(outputType, dec)
	if err != nil && err != io.EOF {
		return nil, err
	}

	res.SetOutput(v)

	return res, nil
}

Jeromy's avatar
Jeromy committed
261 262
// read json objects off of the given stream, and write the objects out to
// the 'out' channel
263
func readStreamedJson(req cmds.Request, rr io.Reader, out chan<- interface{}) {
Jeromy's avatar
Jeromy committed
264
	defer close(out)
265
	dec := json.NewDecoder(rr)
Jeromy's avatar
Jeromy committed
266 267 268 269 270 271 272
	outputType := reflect.TypeOf(req.Command().Type)

	ctx := req.Context()

	for {
		v, err := decodeTypedVal(outputType, dec)
		if err != nil {
273
			if err != io.EOF {
Jeromy's avatar
Jeromy committed
274 275 276
				log.Error(err)
			}
			return
277
		}
Jeromy's avatar
Jeromy committed
278 279 280 281 282

		select {
		case <-ctx.Done():
			return
		case out <- v:
283
		}
284
	}
Jeromy's avatar
Jeromy committed
285
}
286

Jeromy's avatar
Jeromy committed
287 288
// decode a value of the given type, if the type is nil, attempt to decode into
// an interface{} anyways
Jeromy's avatar
Jeromy committed
289 290 291 292 293 294 295 296 297 298 299
func decodeTypedVal(t reflect.Type, dec *json.Decoder) (interface{}, error) {
	var v interface{}
	var err error
	if t != nil {
		v = reflect.New(t).Interface()
		err = dec.Decode(v)
	} else {
		err = dec.Decode(&v)
	}

	return v, err
300
}
301

Jeromy's avatar
Jeromy committed
302 303 304
// httpResponseReader reads from the response body, and checks for an error
// in the http trailer upon EOF, this error if present is returned instead
// of the EOF.
305 306 307 308 309 310
type httpResponseReader struct {
	resp *http.Response
}

func (r *httpResponseReader) Read(b []byte) (int, error) {
	n, err := r.resp.Body.Read(b)
311 312 313 314 315

	// reading on a closed response body is as good as an io.EOF here
	if err != nil && strings.Contains(err.Error(), "read on closed response body") {
		err = io.EOF
	}
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
	if err == io.EOF {
		_ = r.resp.Body.Close()
		trailerErr := r.checkError()
		if trailerErr != nil {
			return n, trailerErr
		}
	}
	return n, err
}

func (r *httpResponseReader) checkError() error {
	if e := r.resp.Trailer.Get(StreamErrHeader); e != "" {
		return errors.New(e)
	}
	return nil
}

func (r *httpResponseReader) Close() error {
	return r.resp.Body.Close()
}