Commit aae9eb78 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #3244 from ipfs/feat/gateway-post-coreapi

gateway: use core api for serving GET/HEAD/POST
parents 580584a9 f610e19d
package coreapi
import (
"context"
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
path "github.com/ipfs/go-ipfs/path"
ipld "gx/ipfs/QmU7bFWQ793qmvNy7outdCaMfSDNk8uqhx4VNrxYj5fj5g/go-ipld-node"
)
func resolve(ctx context.Context, n *core.IpfsNode, p string) (ipld.Node, error) {
pp, err := path.ParsePath(p)
if err != nil {
return nil, err
}
dagnode, err := core.Resolve(ctx, n.Namesys, n.Resolver, pp)
if err == core.ErrNoNamesys {
return nil, coreiface.ErrOffline
} else if err != nil {
return nil, err
}
return dagnode, nil
}
package iface
import (
"context"
"errors"
"io"
ipld "gx/ipfs/QmU7bFWQ793qmvNy7outdCaMfSDNk8uqhx4VNrxYj5fj5g/go-ipld-node"
cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid"
)
// type CoreAPI interface {
// ID() CoreID
// Version() CoreVersion
// }
type Link ipld.Link
type Reader interface {
io.ReadSeeker
io.Closer
}
type UnixfsAPI interface {
Add(context.Context, io.Reader) (*cid.Cid, error)
Cat(context.Context, string) (Reader, error)
Ls(context.Context, string) ([]*Link, error)
}
// type ObjectAPI interface {
// New() (cid.Cid, Object)
// Get(string) (Object, error)
// Links(string) ([]*Link, error)
// Data(string) (Reader, error)
// Stat(string) (ObjectStat, error)
// Put(Object) (cid.Cid, error)
// SetData(string, Reader) (cid.Cid, error)
// AppendData(string, Data) (cid.Cid, error)
// AddLink(string, string, string) (cid.Cid, error)
// RmLink(string, string) (cid.Cid, error)
// }
// type ObjectStat struct {
// Cid cid.Cid
// NumLinks int
// BlockSize int
// LinksSize int
// DataSize int
// CumulativeSize int
// }
var ErrIsDir = errors.New("object is a directory")
var ErrIsNonDag = errors.New("not a merkledag object")
var ErrOffline = errors.New("can't resolve, ipfs node is offline")
package coreapi
import (
"context"
"io"
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
coreunix "github.com/ipfs/go-ipfs/core/coreunix"
uio "github.com/ipfs/go-ipfs/unixfs/io"
cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid"
)
type UnixfsAPI struct {
node *core.IpfsNode
}
func NewUnixfsAPI(n *core.IpfsNode) coreiface.UnixfsAPI {
api := &UnixfsAPI{n}
return api
}
func (api *UnixfsAPI) Add(ctx context.Context, r io.Reader) (*cid.Cid, error) {
k, err := coreunix.AddWithContext(ctx, api.node, r)
if err != nil {
return nil, err
}
return cid.Decode(k)
}
func (api *UnixfsAPI) Cat(ctx context.Context, p string) (coreiface.Reader, error) {
dagnode, err := resolve(ctx, api.node, p)
if err != nil {
return nil, err
}
r, err := uio.NewDagReader(ctx, dagnode, api.node.DAG)
if err == uio.ErrIsDir {
return nil, coreiface.ErrIsDir
} else if err != nil {
return nil, err
}
return r, nil
}
func (api *UnixfsAPI) Ls(ctx context.Context, p string) ([]*coreiface.Link, error) {
dagnode, err := resolve(ctx, api.node, p)
if err != nil {
return nil, err
}
l := dagnode.Links()
links := make([]*coreiface.Link, len(l))
for i, l := range l {
links[i] = &coreiface.Link{l.Name, l.Size, l.Cid}
}
return links, nil
}
package coreapi_test
import (
"bytes"
"context"
"io"
"strings"
"testing"
core "github.com/ipfs/go-ipfs/core"
coreapi "github.com/ipfs/go-ipfs/core/coreapi"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
coreunix "github.com/ipfs/go-ipfs/core/coreunix"
mdag "github.com/ipfs/go-ipfs/merkledag"
repo "github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config"
testutil "github.com/ipfs/go-ipfs/thirdparty/testutil"
unixfs "github.com/ipfs/go-ipfs/unixfs"
)
// `echo -n 'hello, world!' | ipfs add`
var hello = "QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk"
var helloStr = "hello, world!"
// `ipfs object new unixfs-dir`
var emptyUnixfsDir = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"
// `echo -n | ipfs add`
var emptyUnixfsFile = "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH"
func makeAPI(ctx context.Context) (*core.IpfsNode, coreiface.UnixfsAPI, error) {
r := &repo.Mock{
C: config.Config{
Identity: config.Identity{
PeerID: "Qmfoo", // required by offline node
},
},
D: testutil.ThreadSafeCloserMapDatastore(),
}
node, err := core.NewNode(ctx, &core.BuildCfg{Repo: r})
if err != nil {
return nil, nil, err
}
api := coreapi.NewUnixfsAPI(node)
return node, api, nil
}
func TestAdd(t *testing.T) {
ctx := context.Background()
_, api, err := makeAPI(ctx)
if err != nil {
t.Error(err)
}
str := strings.NewReader(helloStr)
c, err := api.Add(ctx, str)
if err != nil {
t.Error(err)
}
if c.String() != hello {
t.Fatalf("expected CID %s, got: %s", hello, c)
}
r, err := api.Cat(ctx, hello)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, len(helloStr))
_, err = io.ReadFull(r, buf)
if err != nil {
t.Error(err)
}
if string(buf) != helloStr {
t.Fatalf("expected [%s], got [%s] [err=%s]", helloStr, string(buf), err)
}
}
func TestAddEmptyFile(t *testing.T) {
ctx := context.Background()
_, api, err := makeAPI(ctx)
if err != nil {
t.Error(err)
}
str := strings.NewReader("")
c, err := api.Add(ctx, str)
if err != nil {
t.Error(err)
}
if c.String() != emptyUnixfsFile {
t.Fatalf("expected CID %s, got: %s", hello, c)
}
}
func TestCatBasic(t *testing.T) {
ctx := context.Background()
node, api, err := makeAPI(ctx)
if err != nil {
t.Fatal(err)
}
hr := strings.NewReader(helloStr)
k, err := coreunix.Add(node, hr)
if err != nil {
t.Fatal(err)
}
if k != hello {
t.Fatalf("expected CID %s, got: %s", hello, k)
}
r, err := api.Cat(ctx, k)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, len(helloStr))
_, err = io.ReadFull(r, buf)
if err != nil {
t.Error(err)
}
if string(buf) != helloStr {
t.Fatalf("expected [%s], got [%s] [err=%s]", helloStr, string(buf), err)
}
}
func TestCatEmptyFile(t *testing.T) {
ctx := context.Background()
node, api, err := makeAPI(ctx)
if err != nil {
t.Fatal(err)
}
_, err = coreunix.Add(node, strings.NewReader(""))
if err != nil {
t.Fatal(err)
}
r, err := api.Cat(ctx, emptyUnixfsFile)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 1) // non-zero so that Read() actually tries to read
n, err := io.ReadFull(r, buf)
if err != nil && err != io.EOF {
t.Error(err)
}
if !bytes.HasPrefix(buf, []byte{0x00}) {
t.Fatalf("expected empty data, got [%s] [read=%d]", buf, n)
}
}
func TestCatDir(t *testing.T) {
ctx := context.Background()
node, api, err := makeAPI(ctx)
if err != nil {
t.Error(err)
}
c, err := node.DAG.Add(unixfs.EmptyDirNode())
if err != nil {
t.Error(err)
}
_, err = api.Cat(ctx, c.String())
if err != coreiface.ErrIsDir {
t.Fatalf("expected ErrIsDir, got: %s", err)
}
}
func TestCatNonUnixfs(t *testing.T) {
ctx := context.Background()
node, api, err := makeAPI(ctx)
if err != nil {
t.Error(err)
}
c, err := node.DAG.Add(new(mdag.ProtoNode))
if err != nil {
t.Error(err)
}
_, err = api.Cat(ctx, c.String())
if !strings.Contains(err.Error(), "proto: required field") {
t.Fatalf("expected protobuf error, got: %s", err)
}
}
func TestCatOffline(t *testing.T) {
ctx := context.Background()
_, api, err := makeAPI(ctx)
if err != nil {
t.Error(err)
}
_, err = api.Cat(ctx, "/ipns/Qmfoobar")
if err != coreiface.ErrOffline {
t.Fatalf("expected ErrOffline, got: %", err)
}
}
func TestLs(t *testing.T) {
ctx := context.Background()
node, api, err := makeAPI(ctx)
if err != nil {
t.Error(err)
}
r := strings.NewReader("content-of-file")
p, _, err := coreunix.AddWrapped(node, r, "name-of-file")
if err != nil {
t.Error(err)
}
parts := strings.Split(p, "/")
if len(parts) != 2 {
t.Errorf("unexpected path:", p)
}
k := parts[0]
links, err := api.Ls(ctx, k)
if err != nil {
t.Error(err)
}
if len(links) != 1 {
t.Fatalf("expected 1 link, got %d", len(links))
}
if links[0].Size != 23 {
t.Fatalf("expected size = 23, got %d", links[0].Size)
}
if links[0].Name != "name-of-file" {
t.Fatalf("expected name = name-of-file, got %s", links[0].Name)
}
if links[0].Cid.String() != "QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr" {
t.Fatalf("expected cid = QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr, got %s", links[0].Cid.String())
}
}
func TestLsEmptyDir(t *testing.T) {
ctx := context.Background()
node, api, err := makeAPI(ctx)
if err != nil {
t.Error(err)
}
c, err := node.DAG.Add(unixfs.EmptyDirNode())
if err != nil {
t.Error(err)
}
links, err := api.Ls(ctx, c.String())
if err != nil {
t.Error(err)
}
if len(links) != 0 {
t.Fatalf("expected 0 links, got %d", len(links))
}
}
// TODO(lgierth) this should test properly, with len(links) > 0
func TestLsNonUnixfs(t *testing.T) {
ctx := context.Background()
node, api, err := makeAPI(ctx)
if err != nil {
t.Error(err)
}
c, err := node.DAG.Add(new(mdag.ProtoNode))
if err != nil {
t.Error(err)
}
links, err := api.Ls(ctx, c.String())
if err != nil {
t.Error(err)
}
if len(links) != 0 {
t.Fatalf("expected 0 links, got %d", len(links))
}
}
......@@ -6,6 +6,7 @@ import (
"net/http"
core "github.com/ipfs/go-ipfs/core"
coreapi "github.com/ipfs/go-ipfs/core/coreapi"
config "github.com/ipfs/go-ipfs/repo/config"
id "gx/ipfs/QmUYzZRJcuUxLSnSzF1bSyw1jYbNAULkBrbS6rnr7F72uK/go-libp2p/p2p/protocol/identify"
)
......@@ -27,7 +28,7 @@ func GatewayOption(writable bool, paths ...string) ServeOption {
Headers: cfg.Gateway.HTTPHeaders,
Writable: writable,
PathPrefixes: cfg.Gateway.PathPrefixes,
})
}, coreapi.NewUnixfsAPI(n))
for _, p := range paths {
mux.Handle(p+"/", gateway)
......@@ -37,7 +38,7 @@ func GatewayOption(writable bool, paths ...string) ServeOption {
}
func VersionOption() ServeOption {
return func(n *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
return func(_ *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Commit: %s\n", config.CurrentCommit)
fmt.Fprintf(w, "Client Version: %s\n", id.ClientVersion)
......
......@@ -16,9 +16,10 @@ import (
chunk "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
dagutils "github.com/ipfs/go-ipfs/merkledag/utils"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
path "github.com/ipfs/go-ipfs/path"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
routing "gx/ipfs/QmQKEgGgYCDyk8VNY6A65FpuE4YwbspvjXHco1rdb75PVc/go-libp2p-routing"
......@@ -36,12 +37,14 @@ const (
type gatewayHandler struct {
node *core.IpfsNode
config GatewayConfig
api coreiface.UnixfsAPI
}
func newGatewayHandler(node *core.IpfsNode, conf GatewayConfig) *gatewayHandler {
func newGatewayHandler(n *core.IpfsNode, c GatewayConfig, api coreiface.UnixfsAPI) *gatewayHandler {
i := &gatewayHandler{
node: node,
config: conf,
node: n,
config: c,
api: api,
}
return i
}
......@@ -57,6 +60,21 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (node.Node, error) {
// TODO(btc): break this apart into separate handlers using a more expressive muxer
func (i *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(i.node.Context(), time.Hour)
// the hour is a hard fallback, we don't expect it to happen, but just in case
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
clientGone := cn.CloseNotify()
go func() {
select {
case <-clientGone:
case <-ctx.Done():
}
cancel()
}()
}
defer func() {
if r := recover(); r != nil {
log.Error("A panic occurred in the gateway handler!")
......@@ -68,7 +86,7 @@ func (i *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if i.config.Writable {
switch r.Method {
case "POST":
i.postHandler(w, r)
i.postHandler(ctx, w, r)
return
case "PUT":
i.putHandler(w, r)
......@@ -80,7 +98,7 @@ func (i *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
if r.Method == "GET" || r.Method == "HEAD" {
i.getOrHeadHandler(w, r)
i.getOrHeadHandler(ctx, w, r)
return
}
......@@ -110,21 +128,7 @@ func (i *gatewayHandler) optionsHandler(w http.ResponseWriter, r *http.Request)
i.addUserHeaders(w) // return all custom headers (including CORS ones, if set)
}
func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(i.node.Context(), time.Hour)
// the hour is a hard fallback, we don't expect it to happen, but just in case
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
clientGone := cn.CloseNotify()
go func() {
select {
case <-clientGone:
case <-ctx.Done():
}
cancel()
}()
}
func (i *gatewayHandler) getOrHeadHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
urlPath := r.URL.Path
......@@ -154,27 +158,19 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
ipnsHostname = true
}
p, err := path.ParsePath(urlPath)
if err != nil {
webError(w, "Invalid Path Error", err, http.StatusBadRequest)
return
}
nd, err := core.Resolve(ctx, i.node.Namesys, i.node.Resolver, p)
// If node is in offline mode the error code and message should be different
if err == core.ErrNoNamesys && !i.node.OnlineMode() {
dr, err := i.api.Cat(ctx, urlPath)
dir := false
if err == coreiface.ErrIsDir {
dir = true
} else if err == coreiface.ErrOffline {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprint(w, "Could not resolve path. Node is in offline mode.")
return
} else if err != nil {
webError(w, "Path Resolve error", err, http.StatusBadRequest)
return
}
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
webError(w, "Cannot read non protobuf nodes through gateway", dag.ErrNotProtobuf, http.StatusBadRequest)
return
} else {
defer dr.Close()
}
etag := gopath.Base(urlPath)
......@@ -204,13 +200,6 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
w.Header().Set("Suborigin", pathRoot)
}
dr, err := uio.NewDagReader(ctx, pbnd, i.node.DAG)
if err != nil && err != uio.ErrIsDir {
// not a directory and still an error
internalWebError(w, err)
return
}
// set these headers _after_ the error, for we may just not have it
// and dont want the client to cache a 500 response...
// and only if it's /ipfs!
......@@ -224,18 +213,23 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
modtime = time.Unix(1, 0)
}
if err == nil {
defer dr.Close()
if !dir {
name := gopath.Base(urlPath)
http.ServeContent(w, r, name, modtime, dr)
return
}
links, err := i.api.Ls(ctx, urlPath)
if err != nil {
internalWebError(w, err)
return
}
// storage for directory listing
var dirListing []directoryItem
// loop through files
foundIndex := false
for _, link := range nd.Links() {
for _, link := range links {
if link.Name == "index.html" {
log.Debugf("found index.html link for %s", urlPath)
foundIndex = true
......@@ -254,19 +248,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
}
// return index page instead.
nd, err := core.Resolve(ctx, i.node.Namesys, i.node.Resolver, p)
if err != nil {
internalWebError(w, err)
return
}
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
internalWebError(w, dag.ErrNotProtobuf)
return
}
dr, err := uio.NewDagReader(ctx, pbnd, i.node.DAG)
dr, err := i.api.Cat(ctx, p.String())
if err != nil {
internalWebError(w, err)
return
......@@ -332,14 +314,8 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request
}
}
func (i *gatewayHandler) postHandler(w http.ResponseWriter, r *http.Request) {
nd, err := i.newDagFromReader(r.Body)
if err != nil {
internalWebError(w, err)
return
}
k, err := i.node.DAG.Add(nd)
func (i *gatewayHandler) postHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
k, err := i.api.Add(ctx, r.Body)
if err != nil {
internalWebError(w, err)
return
......
......@@ -254,6 +254,10 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
// Add builds a merkledag from the a reader, pinning all objects to the local
// datastore. Returns a key representing the root node.
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
return AddWithContext(n.Context(), n, r)
}
func AddWithContext(ctx context.Context, n *core.IpfsNode, r io.Reader) (string, error) {
defer n.Blockstore.PinLock().Unlock()
fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment