client.go 6.81 KB
Newer Older
1 2 3
package http

import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
7
	"fmt"
8
	"io"
9
	"net/http"
10
	"net/url"
11
	"reflect"
12
	"strconv"
13 14
	"strings"

15 16
	cmds "github.com/ipfs/go-ipfs/commands"
	config "github.com/ipfs/go-ipfs/repo/config"
17 18

	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
19 20
)

21
const (
22
	ApiUrlFormat = "http://%s%s/%s?%s"
23 24
	ApiPath      = "/api/v0" // TODO: make configurable
)
25

26 27 28 29 30 31 32 33
// Client is the commands HTTP client interface.
type Client interface {
	Send(req cmds.Request) (cmds.Response, error)
}

type client struct {
	serverAddress string
}
34

35 36 37
func NewClient(address string) Client {
	return &client{address}
}
38

39
func (c *client) Send(req cmds.Request) (cmds.Response, error) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40

41 42 43 44 45 46 47 48
	if req.Context() == nil {
		log.Warningf("no context set in request")
		err := req.SetRootContext(context.TODO())
		if err != nil {
			return nil, err
		}
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50 51 52 53 54 55
	// save user-provided encoding
	previousUserProvidedEncoding, found, err := req.Option(cmds.EncShort).String()
	if err != nil {
		return nil, err
	}

	// override with json to send to server
56
	req.SetOption(cmds.EncShort, cmds.JSON)
57

58 59 60
	// stream channel output
	req.SetOption(cmds.ChanOpt, "true")

61
	query, err := getQuery(req)
62 63 64 65
	if err != nil {
		return nil, err
	}

66
	var fileReader *MultiFileReader
67 68
	var reader io.Reader

69
	if req.Files() != nil {
70
		fileReader = NewMultiFileReader(req.Files(), true)
71 72 73 74 75
		reader = fileReader
	} else {
		// if we have no file data, use an empty Reader
		// (http.NewRequest panics when a nil Reader is used)
		reader = strings.NewReader("")
76 77
	}

78 79 80
	path := strings.Join(req.Path(), "/")
	url := fmt.Sprintf(ApiUrlFormat, c.serverAddress, ApiPath, path, query)

81
	httpReq, err := http.NewRequest("POST", url, reader)
82 83 84 85 86 87 88 89 90 91 92 93 94 95
	if err != nil {
		return nil, err
	}

	// TODO extract string consts?
	if fileReader != nil {
		httpReq.Header.Set("Content-Type", "multipart/form-data; boundary="+fileReader.Boundary())
		httpReq.Header.Set("Content-Disposition", "form-data: name=\"files\"")
	} else {
		httpReq.Header.Set("Content-Type", "application/octet-stream")
	}
	version := config.CurrentVersionNumber
	httpReq.Header.Set("User-Agent", fmt.Sprintf("/go-ipfs/%s/", version))

96 97
	ec := make(chan error, 1)
	rc := make(chan cmds.Response, 1)
Jeromy's avatar
Jeromy committed
98
	dc := req.Context().Done()
99 100 101 102 103 104 105

	go func() {
		httpRes, err := http.DefaultClient.Do(httpReq)
		if err != nil {
			ec <- err
			return
		}
106

107 108 109 110 111 112
		// using the overridden JSON encoding in request
		res, err := getResponse(httpRes, req)
		if err != nil {
			ec <- err
			return
		}
113

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
		rc <- res
	}()

	for {
		select {
		case <-dc:
			log.Debug("Context cancelled, cancelling HTTP request...")
			tr := http.DefaultTransport.(*http.Transport)
			tr.CancelRequest(httpReq)
			dc = nil // Wait for ec or rc
		case err := <-ec:
			return nil, err
		case res := <-rc:
			if found && len(previousUserProvidedEncoding) > 0 {
				// reset to user provided encoding after sending request
				// NB: if user has provided an encoding but it is the empty string,
				// still leave it as JSON.
				req.SetOption(cmds.EncShort, previousUserProvidedEncoding)
			}
			return res, nil
		}
135 136 137
	}
}

138
func getQuery(req cmds.Request) (string, error) {
139
	query := url.Values{}
140
	for k, v := range req.Options() {
141
		str := fmt.Sprintf("%v", v)
142
		query.Set(k, str)
143
	}
144 145

	args := req.Arguments()
146 147
	argDefs := req.Command().Arguments

148 149 150 151 152 153 154 155
	argDefIndex := 0

	for _, arg := range args {
		argDef := argDefs[argDefIndex]
		// skip ArgFiles
		for argDef.Type == cmds.ArgFile {
			argDefIndex++
			argDef = argDefs[argDefIndex]
156 157
		}

158 159 160 161
		query.Add("arg", arg)

		if len(argDefs) > argDefIndex+1 {
			argDefIndex++
162
		}
163
	}
164

165
	return query.Encode(), nil
166
}
167

168 169 170
// getResponse decodes a http.Response to create a cmds.Response
func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error) {
	var err error
171 172
	res := cmds.NewResponse(req)

173
	contentType := httpRes.Header.Get(contentTypeHeader)
174 175
	contentType = strings.Split(contentType, ";")[0]

176 177 178 179 180 181 182 183 184
	lengthHeader := httpRes.Header.Get(contentLengthHeader)
	if len(lengthHeader) > 0 {
		length, err := strconv.ParseUint(lengthHeader, 10, 64)
		if err != nil {
			return nil, err
		}
		res.SetLength(length)
	}

185 186
	res.SetCloser(httpRes.Body)

187
	if len(httpRes.Header.Get(streamHeader)) > 0 && contentType != "application/json" {
188
		// if output is a stream, we can just use the body reader
189
		res.SetOutput(&httpResponseReader{httpRes})
190
		return res, nil
191 192 193 194 195

	} else if len(httpRes.Header.Get(channelHeader)) > 0 {
		// if output is coming from a channel, decode each chunk
		outChan := make(chan interface{})
		go func() {
196
			dec := json.NewDecoder(&httpResponseReader{httpRes})
197
			outputType := reflect.TypeOf(req.Command().Type)
198

Jeromy's avatar
Jeromy committed
199
			ctx := req.Context()
200

201
			for {
202 203 204 205 206 207 208 209
				var v interface{}
				var err error
				if outputType != nil {
					v = reflect.New(outputType).Interface()
					err = dec.Decode(v)
				} else {
					err = dec.Decode(&v)
				}
210 211 212 213 214 215 216

				// since we are just looping reading on the response, the only way to
				// know we are 'done' is for the consumer to close the response body.
				// doing so doesnt throw an io.EOF, but we want to treat it like one.
				if err != nil && strings.Contains(err.Error(), "read on closed response body") {
					err = io.EOF
				}
217
				if err != nil && err != io.EOF {
218
					log.Error(err)
219 220
					return
				}
221 222 223 224 225 226 227 228

				select {
				case <-ctx.Done():
					close(outChan)
					return
				default:
				}

229 230
				if err == io.EOF {
					close(outChan)
231 232 233 234 235 236
					return
				}
				outChan <- v
			}
		}()

237
		res.SetOutput((<-chan interface{})(outChan))
238
		return res, nil
239 240
	}

241
	dec := json.NewDecoder(&httpResponseReader{httpRes})
242 243 244

	if httpRes.StatusCode >= http.StatusBadRequest {
		e := cmds.Error{}
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263

		if httpRes.StatusCode == http.StatusNotFound {
			// handle 404s
			e.Message = "Command not found."
			e.Code = cmds.ErrClient

		} else if contentType == "text/plain" {
			// handle non-marshalled errors
			buf := bytes.NewBuffer(nil)
			io.Copy(buf, httpRes.Body)
			e.Message = string(buf.Bytes())
			e.Code = cmds.ErrNormal

		} else {
			// handle marshalled errors
			err = dec.Decode(&e)
			if err != nil {
				return nil, err
			}
264 265 266 267 268
		}

		res.SetError(e, e.Code)

	} else {
269
		outputType := reflect.TypeOf(req.Command().Type)
270 271 272 273 274 275 276 277
		var v interface{}

		if outputType != nil {
			v = reflect.New(outputType).Interface()
			err = dec.Decode(v)
		} else {
			err = dec.Decode(&v)
		}
278
		if err != nil && err != io.EOF {
279 280
			return nil, err
		}
281 282 283
		if v != nil {
			res.SetOutput(v)
		}
284 285
	}

286 287
	return res, nil
}
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314

type httpResponseReader struct {
	resp *http.Response
}

func (r *httpResponseReader) Read(b []byte) (int, error) {
	n, err := r.resp.Body.Read(b)
	if err == io.EOF {
		_ = r.resp.Body.Close()
		trailerErr := r.checkError()
		if trailerErr != nil {
			return n, trailerErr
		}
	}
	return n, err
}

func (r *httpResponseReader) checkError() error {
	if e := r.resp.Trailer.Get(StreamErrHeader); e != "" {
		return errors.New(e)
	}
	return nil
}

func (r *httpResponseReader) Close() error {
	return r.resp.Body.Close()
}