client.go 7.22 KB
Newer Older
1 2 3
package http

import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
7
	"fmt"
8
	"io"
9
	"net/http"
10
	"net/url"
11
	"reflect"
12
	"strconv"
13 14
	"strings"

15 16
	cmds "github.com/ipfs/go-ipfs/commands"
	config "github.com/ipfs/go-ipfs/repo/config"
17 18

	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
19 20
)

21
const (
22
	ApiUrlFormat = "http://%s%s/%s?%s"
23 24
	ApiPath      = "/api/v0" // TODO: make configurable
)
25

26 27 28 29 30 31 32
// Client is the commands HTTP client interface.
type Client interface {
	Send(req cmds.Request) (cmds.Response, error)
}

type client struct {
	serverAddress string
33
	httpClient    http.Client
34
}
35

36
func NewClient(address string) Client {
Jeromy's avatar
Jeromy committed
37 38 39
	// We cannot use the default transport because of a bug in go's connection reuse
	// code. It causes random failures in the connection including io.EOF and connection
	// refused on 'client.Do'
40 41 42 43 44 45 46 47
	return &client{
		serverAddress: address,
		httpClient: http.Client{
			Transport: &http.Transport{
				DisableKeepAlives: true,
			},
		},
	}
48
}
49

50
func (c *client) Send(req cmds.Request) (cmds.Response, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51

52 53 54 55 56 57 58 59
	if req.Context() == nil {
		log.Warningf("no context set in request")
		err := req.SetRootContext(context.TODO())
		if err != nil {
			return nil, err
		}
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
60 61 62 63 64 65 66
	// save user-provided encoding
	previousUserProvidedEncoding, found, err := req.Option(cmds.EncShort).String()
	if err != nil {
		return nil, err
	}

	// override with json to send to server
67
	req.SetOption(cmds.EncShort, cmds.JSON)
68

69 70 71
	// stream channel output
	req.SetOption(cmds.ChanOpt, "true")

72
	query, err := getQuery(req)
73 74 75 76
	if err != nil {
		return nil, err
	}

77
	var fileReader *MultiFileReader
78 79
	var reader io.Reader

80
	if req.Files() != nil {
81
		fileReader = NewMultiFileReader(req.Files(), true)
82 83 84 85 86
		reader = fileReader
	} else {
		// if we have no file data, use an empty Reader
		// (http.NewRequest panics when a nil Reader is used)
		reader = strings.NewReader("")
87 88
	}

89 90 91
	path := strings.Join(req.Path(), "/")
	url := fmt.Sprintf(ApiUrlFormat, c.serverAddress, ApiPath, path, query)

92
	httpReq, err := http.NewRequest("POST", url, reader)
93 94 95 96 97 98
	if err != nil {
		return nil, err
	}

	// TODO extract string consts?
	if fileReader != nil {
99 100
		httpReq.Header.Set(contentTypeHeader, "multipart/form-data; boundary="+fileReader.Boundary())
		httpReq.Header.Set(contentDispHeader, "form-data: name=\"files\"")
101
	} else {
102
		httpReq.Header.Set(contentTypeHeader, applicationOctetStream)
103 104
	}
	version := config.CurrentVersionNumber
105
	httpReq.Header.Set(uaHeader, fmt.Sprintf("/go-ipfs/%s/", version))
106

107 108
	ec := make(chan error, 1)
	rc := make(chan cmds.Response, 1)
Jeromy's avatar
Jeromy committed
109
	dc := req.Context().Done()
110 111

	go func() {
112
		httpRes, err := c.httpClient.Do(httpReq)
113 114 115 116
		if err != nil {
			ec <- err
			return
		}
117

118 119 120 121 122 123
		// using the overridden JSON encoding in request
		res, err := getResponse(httpRes, req)
		if err != nil {
			ec <- err
			return
		}
124

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
		rc <- res
	}()

	for {
		select {
		case <-dc:
			log.Debug("Context cancelled, cancelling HTTP request...")
			tr := http.DefaultTransport.(*http.Transport)
			tr.CancelRequest(httpReq)
			dc = nil // Wait for ec or rc
		case err := <-ec:
			return nil, err
		case res := <-rc:
			if found && len(previousUserProvidedEncoding) > 0 {
				// reset to user provided encoding after sending request
				// NB: if user has provided an encoding but it is the empty string,
				// still leave it as JSON.
				req.SetOption(cmds.EncShort, previousUserProvidedEncoding)
			}
			return res, nil
		}
146 147 148
	}
}

149
func getQuery(req cmds.Request) (string, error) {
150
	query := url.Values{}
151
	for k, v := range req.Options() {
152
		str := fmt.Sprintf("%v", v)
153
		query.Set(k, str)
154
	}
155 156

	args := req.Arguments()
157 158
	argDefs := req.Command().Arguments

159 160 161 162 163 164 165 166
	argDefIndex := 0

	for _, arg := range args {
		argDef := argDefs[argDefIndex]
		// skip ArgFiles
		for argDef.Type == cmds.ArgFile {
			argDefIndex++
			argDef = argDefs[argDefIndex]
167 168
		}

169 170 171 172
		query.Add("arg", arg)

		if len(argDefs) > argDefIndex+1 {
			argDefIndex++
173
		}
174
	}
175

176
	return query.Encode(), nil
177
}
178

179 180 181
// getResponse decodes a http.Response to create a cmds.Response
func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error) {
	var err error
182 183
	res := cmds.NewResponse(req)

184
	contentType := httpRes.Header.Get(contentTypeHeader)
185 186
	contentType = strings.Split(contentType, ";")[0]

187 188 189 190 191 192 193 194 195
	lengthHeader := httpRes.Header.Get(contentLengthHeader)
	if len(lengthHeader) > 0 {
		length, err := strconv.ParseUint(lengthHeader, 10, 64)
		if err != nil {
			return nil, err
		}
		res.SetLength(length)
	}

196 197
	rr := &httpResponseReader{httpRes}
	res.SetCloser(rr)
198

199
	if contentType != applicationJson {
Jeromy's avatar
Jeromy committed
200
		// for all non json output types, just stream back the output
201
		res.SetOutput(rr)
202
		return res, nil
203 204 205 206

	} else if len(httpRes.Header.Get(channelHeader)) > 0 {
		// if output is coming from a channel, decode each chunk
		outChan := make(chan interface{})
Jeromy's avatar
Jeromy committed
207

208
		go readStreamedJson(req, rr, outChan)
209

210
		res.SetOutput((<-chan interface{})(outChan))
211
		return res, nil
212 213
	}

214
	dec := json.NewDecoder(rr)
215

Jeromy's avatar
Jeromy committed
216
	// If we ran into an error
217 218
	if httpRes.StatusCode >= http.StatusBadRequest {
		e := cmds.Error{}
219

Jeromy's avatar
Jeromy committed
220 221
		switch {
		case httpRes.StatusCode == http.StatusNotFound:
222 223 224 225
			// handle 404s
			e.Message = "Command not found."
			e.Code = cmds.ErrClient

226
		case contentType == plainText:
227 228 229 230 231 232
			// handle non-marshalled errors
			buf := bytes.NewBuffer(nil)
			io.Copy(buf, httpRes.Body)
			e.Message = string(buf.Bytes())
			e.Code = cmds.ErrNormal

Jeromy's avatar
Jeromy committed
233
		default:
234 235 236 237 238
			// handle marshalled errors
			err = dec.Decode(&e)
			if err != nil {
				return nil, err
			}
239 240 241 242
		}

		res.SetError(e, e.Code)

Jeromy's avatar
Jeromy committed
243 244 245 246 247 248 249 250 251 252 253 254 255 256
		return res, nil
	}

	outputType := reflect.TypeOf(req.Command().Type)
	v, err := decodeTypedVal(outputType, dec)
	if err != nil && err != io.EOF {
		return nil, err
	}

	res.SetOutput(v)

	return res, nil
}

Jeromy's avatar
Jeromy committed
257 258
// read json objects off of the given stream, and write the objects out to
// the 'out' channel
259
func readStreamedJson(req cmds.Request, rr io.Reader, out chan<- interface{}) {
Jeromy's avatar
Jeromy committed
260
	defer close(out)
261
	dec := json.NewDecoder(rr)
Jeromy's avatar
Jeromy committed
262 263 264 265 266 267 268
	outputType := reflect.TypeOf(req.Command().Type)

	ctx := req.Context()

	for {
		v, err := decodeTypedVal(outputType, dec)
		if err != nil {
269
			// reading on a closed response body is as good as an io.EOF here
Jeromy's avatar
Jeromy committed
270 271 272 273
			if !(strings.Contains(err.Error(), "read on closed response body") || err == io.EOF) {
				log.Error(err)
			}
			return
274
		}
Jeromy's avatar
Jeromy committed
275 276 277 278 279

		select {
		case <-ctx.Done():
			return
		case out <- v:
280
		}
281
	}
Jeromy's avatar
Jeromy committed
282
}
283

Jeromy's avatar
Jeromy committed
284 285
// decode a value of the given type, if the type is nil, attempt to decode into
// an interface{} anyways
Jeromy's avatar
Jeromy committed
286 287 288 289 290 291 292 293 294 295 296
func decodeTypedVal(t reflect.Type, dec *json.Decoder) (interface{}, error) {
	var v interface{}
	var err error
	if t != nil {
		v = reflect.New(t).Interface()
		err = dec.Decode(v)
	} else {
		err = dec.Decode(&v)
	}

	return v, err
297
}
298

Jeromy's avatar
Jeromy committed
299 300 301
// httpResponseReader reads from the response body, and checks for an error
// in the http trailer upon EOF, this error if present is returned instead
// of the EOF.
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
type httpResponseReader struct {
	resp *http.Response
}

func (r *httpResponseReader) Read(b []byte) (int, error) {
	n, err := r.resp.Body.Read(b)
	if err == io.EOF {
		_ = r.resp.Body.Close()
		trailerErr := r.checkError()
		if trailerErr != nil {
			return n, trailerErr
		}
	}
	return n, err
}

func (r *httpResponseReader) checkError() error {
	if e := r.resp.Trailer.Get(StreamErrHeader); e != "" {
		return errors.New(e)
	}
	return nil
}

func (r *httpResponseReader) Close() error {
	return r.resp.Body.Close()
}