fsrefstore.go 8.74 KB
Newer Older
1 2 3 4 5 6
package filestore

import (
	"context"
	"fmt"
	"io"
Jakub Sztandera's avatar
Jakub Sztandera committed
7
	"net/http"
8 9 10 11 12
	"os"
	"path/filepath"

	pb "github.com/ipfs/go-ipfs/filestore/pb"

13
	dshelp "gx/ipfs/QmNP2u7bofwUQptHQGPfabGWtTCbxhNLSZKqbf1uzsup9V/go-ipfs-ds-help"
14
	proto "gx/ipfs/QmT6n4mspWYEya864BhCUJEgyxiRfmiSY9ruQwTUNpRKaM/protobuf/proto"
15
	blocks "gx/ipfs/QmTRCUvZLiir12Qr6MV3HKfKMHX8Nf1Vddn6t2g5nsQSb9/go-block-format"
16
	posinfo "gx/ipfs/QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe/go-ipfs-posinfo"
17
	cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
Steven Allen's avatar
Steven Allen committed
18
	blockstore "gx/ipfs/QmdpuJBPBZ6sLPj9BQpn3Rpi38BT2cF1QMiUfyzNWeySW4/go-ipfs-blockstore"
Steven Allen's avatar
Steven Allen committed
19 20 21
	ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
	dsns "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/namespace"
	dsq "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/query"
22 23
)

Hector Sanjuan's avatar
Hector Sanjuan committed
24
// FilestorePrefix identifies the key prefix for FileManager blocks.
25 26
var FilestorePrefix = ds.NewKey("filestore")

Hector Sanjuan's avatar
Hector Sanjuan committed
27 28 29 30
// FileManager is a blockstore implementation which stores special
// blocks FilestoreNode type. These nodes only contain a reference
// to the actual location of the block data in the filesystem
// (a path and an offset).
31
type FileManager struct {
32 33 34 35
	AllowFiles bool
	AllowUrls  bool
	ds         ds.Batching
	root       string
36 37
}

Hector Sanjuan's avatar
Hector Sanjuan committed
38 39 40 41
// CorruptReferenceError implements the error interface.
// It is used to indicate that the block contents pointed
// by the referencing blocks cannot be retrieved (i.e. the
// file is not found, or the data changed as it was being read).
42
type CorruptReferenceError struct {
43
	Code Status
44
	Err  error
45 46
}

Hector Sanjuan's avatar
Hector Sanjuan committed
47 48
// Error() returns the error message in the CorruptReferenceError
// as a string.
49 50 51 52
func (c CorruptReferenceError) Error() string {
	return c.Err.Error()
}

Hector Sanjuan's avatar
Hector Sanjuan committed
53 54 55
// NewFileManager initializes a new file manager with the given
// datastore and root. All FilestoreNodes paths are relative to the
// root path given here, which is prepended for any operations.
56
func NewFileManager(ds ds.Batching, root string) *FileManager {
57
	return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root}
58 59
}

Hector Sanjuan's avatar
Hector Sanjuan committed
60 61 62
// AllKeysChan returns a channel from which to read the keys stored in
// the FileManager. If the given context is cancelled the channel will be
// closed.
63 64 65 66 67 68 69 70
func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
	q := dsq.Query{KeysOnly: true}

	res, err := f.ds.Query(q)
	if err != nil {
		return nil, err
	}

71
	out := make(chan *cid.Cid, dsq.KeysOnlyBufSize)
72 73 74 75 76 77 78 79 80 81 82
	go func() {
		defer close(out)
		for {
			v, ok := res.NextSync()
			if !ok {
				return
			}

			k := ds.RawKey(v.Key)
			c, err := dshelp.DsKeyToCid(k)
			if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
83
				log.Errorf("decoding cid from filestore: %s", err)
84 85 86 87 88 89 90 91 92 93 94 95 96 97
				continue
			}

			select {
			case out <- c:
			case <-ctx.Done():
				return
			}
		}
	}()

	return out, nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
98 99
// DeleteBlock deletes the reference-block from the underlying
// datastore. It does not touch the referenced data.
100 101 102 103 104 105 106 107
func (f *FileManager) DeleteBlock(c *cid.Cid) error {
	err := f.ds.Delete(dshelp.CidToDsKey(c))
	if err == ds.ErrNotFound {
		return blockstore.ErrNotFound
	}
	return err
}

Hector Sanjuan's avatar
Hector Sanjuan committed
108 109 110 111
// Get reads a block from the datastore. Reading a block
// is done in two steps: the first step retrieves the reference
// block from the datastore. The second step uses the stored
// path and offsets to read the raw block data directly from disk.
112
func (f *FileManager) Get(c *cid.Cid) (blocks.Block, error) {
Kevin Atkinson's avatar
Kevin Atkinson committed
113 114 115 116 117 118 119 120 121 122 123 124
	dobj, err := f.getDataObj(c)
	if err != nil {
		return nil, err
	}
	out, err := f.readDataObj(c, dobj)
	if err != nil {
		return nil, err
	}

	return blocks.NewBlockWithCid(out, c)
}

Jakub Sztandera's avatar
Jakub Sztandera committed
125
func (f *FileManager) readDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
126
	if IsURL(d.GetFilePath()) {
Jakub Sztandera's avatar
Jakub Sztandera committed
127 128
		return f.readURLDataObj(c, d)
	}
129
	return f.readFileDataObj(c, d)
Jakub Sztandera's avatar
Jakub Sztandera committed
130 131
}

Kevin Atkinson's avatar
Kevin Atkinson committed
132
func (f *FileManager) getDataObj(c *cid.Cid) (*pb.DataObj, error) {
133 134 135 136 137 138 139 140 141 142
	o, err := f.ds.Get(dshelp.CidToDsKey(c))
	switch err {
	case ds.ErrNotFound:
		return nil, blockstore.ErrNotFound
	default:
		return nil, err
	case nil:
		//
	}

Kevin Atkinson's avatar
Kevin Atkinson committed
143 144 145 146
	return unmarshalDataObj(o)
}

func unmarshalDataObj(o interface{}) (*pb.DataObj, error) {
147 148 149 150 151 152 153 154 155 156
	data, ok := o.([]byte)
	if !ok {
		return nil, fmt.Errorf("stored filestore dataobj was not a []byte")
	}

	var dobj pb.DataObj
	if err := proto.Unmarshal(data, &dobj); err != nil {
		return nil, err
	}

Kevin Atkinson's avatar
Kevin Atkinson committed
157
	return &dobj, nil
158 159
}

Jakub Sztandera's avatar
Jakub Sztandera committed
160
func (f *FileManager) readFileDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
161
	if !f.AllowFiles {
162
		return nil, ErrFilestoreNotEnabled
163 164
	}

Jeromy's avatar
Jeromy committed
165 166
	p := filepath.FromSlash(d.GetFilePath())
	abspath := filepath.Join(f.root, p)
167 168

	fi, err := os.Open(abspath)
169
	if os.IsNotExist(err) {
170
		return nil, &CorruptReferenceError{StatusFileNotFound, err}
171
	} else if err != nil {
172
		return nil, &CorruptReferenceError{StatusFileError, err}
173 174 175
	}
	defer fi.Close()

176
	_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
177
	if err != nil {
178
		return nil, &CorruptReferenceError{StatusFileError, err}
179 180 181 182
	}

	outbuf := make([]byte, d.GetSize_())
	_, err = io.ReadFull(fi, outbuf)
183
	if err == io.EOF || err == io.ErrUnexpectedEOF {
184
		return nil, &CorruptReferenceError{StatusFileChanged, err}
185
	} else if err != nil {
186
		return nil, &CorruptReferenceError{StatusFileError, err}
187 188
	}

189 190 191 192 193 194
	outcid, err := c.Prefix().Sum(outbuf)
	if err != nil {
		return nil, err
	}

	if !c.Equals(outcid) {
195
		return nil, &CorruptReferenceError{StatusFileChanged,
196
			fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
197 198
	}

199 200 201
	return outbuf, nil
}

Jakub Sztandera's avatar
Jakub Sztandera committed
202 203
// reads and verifies the block from URL
func (f *FileManager) readURLDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
204
	if !f.AllowUrls {
205
		return nil, ErrUrlstoreNotEnabled
206
	}
Jakub Sztandera's avatar
Jakub Sztandera committed
207 208 209 210 211 212 213 214 215 216

	req, err := http.NewRequest("GET", d.GetFilePath(), nil)
	if err != nil {
		return nil, err
	}

	req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", d.GetOffset(), d.GetOffset()+d.GetSize_()-1))

	res, err := http.DefaultClient.Do(req)
	if err != nil {
217
		return nil, &CorruptReferenceError{StatusFileError, err}
Jakub Sztandera's avatar
Jakub Sztandera committed
218 219
	}
	if res.StatusCode != http.StatusPartialContent {
220 221
		return nil, &CorruptReferenceError{StatusFileError,
			fmt.Errorf("expected HTTP 206 got %d", res.StatusCode)}
Jakub Sztandera's avatar
Jakub Sztandera committed
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
	}

	outbuf := make([]byte, d.GetSize_())
	_, err = io.ReadFull(res.Body, outbuf)
	if err == io.EOF || err == io.ErrUnexpectedEOF {
		return nil, &CorruptReferenceError{StatusFileChanged, err}
	} else if err != nil {
		return nil, &CorruptReferenceError{StatusFileError, err}
	}
	res.Body.Close()

	outcid, err := c.Prefix().Sum(outbuf)
	if err != nil {
		return nil, err
	}

	if !c.Equals(outcid) {
		return nil, &CorruptReferenceError{StatusFileChanged,
			fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
	}

	return outbuf, nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
246 247
// Has returns if the FileManager is storing a block reference. It does not
// validate the data, nor checks if the reference is valid.
248 249 250 251 252 253 254
func (f *FileManager) Has(c *cid.Cid) (bool, error) {
	// NOTE: interesting thing to consider. Has doesnt validate the data.
	// So the data on disk could be invalid, and we could think we have it.
	dsk := dshelp.CidToDsKey(c)
	return f.ds.Has(dsk)
}

255 256 257 258
type putter interface {
	Put(ds.Key, interface{}) error
}

Hector Sanjuan's avatar
Hector Sanjuan committed
259 260
// Put adds a new reference block to the FileManager. It does not check
// that the reference is valid.
261
func (f *FileManager) Put(b *posinfo.FilestoreNode) error {
262 263 264 265
	return f.putTo(b, f.ds)
}

func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
266 267
	var dobj pb.DataObj

268 269
	if IsURL(b.PosInfo.FullPath) {
		if !f.AllowUrls {
270
			return ErrUrlstoreNotEnabled
271 272 273
		}
		dobj.FilePath = proto.String(b.PosInfo.FullPath)
	} else {
274
		if !f.AllowFiles {
275
			return ErrFilestoreNotEnabled
276
		}
Jakub Sztandera's avatar
Jakub Sztandera committed
277 278 279
		if !filepath.HasPrefix(b.PosInfo.FullPath, f.root) {
			return fmt.Errorf("cannot add filestore references outside ipfs root (%s)", f.root)
		}
280

Jakub Sztandera's avatar
Jakub Sztandera committed
281 282 283 284
		p, err := filepath.Rel(f.root, b.PosInfo.FullPath)
		if err != nil {
			return err
		}
285

Jakub Sztandera's avatar
Jakub Sztandera committed
286 287
		dobj.FilePath = proto.String(filepath.ToSlash(p))
	}
288 289 290 291 292 293 294 295
	dobj.Offset = proto.Uint64(b.PosInfo.Offset)
	dobj.Size_ = proto.Uint64(uint64(len(b.RawData())))

	data, err := proto.Marshal(&dobj)
	if err != nil {
		return err
	}

296
	return to.Put(dshelp.CidToDsKey(b.Cid()), data)
297 298
}

Hector Sanjuan's avatar
Hector Sanjuan committed
299 300
// PutMany is like Put() but takes a slice of blocks instead,
// allowing it to create a batch transaction.
301
func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error {
302 303 304 305 306
	batch, err := f.ds.Batch()
	if err != nil {
		return err
	}

307
	for _, b := range bs {
308
		if err := f.putTo(b, batch); err != nil {
309 310 311
			return err
		}
	}
312 313

	return batch.Commit()
314
}
315

316
// IsURL returns true if the string represents a valid URL that the
317 318
// urlstore can handle.  More specifically it returns true if a string
// begins with 'http://' or 'https://'.
319 320 321 322 323
func IsURL(str string) bool {
	return (len(str) > 7 && str[0] == 'h' && str[1] == 't' && str[2] == 't' && str[3] == 'p') &&
		((len(str) > 8 && str[4] == 's' && str[5] == ':' && str[6] == '/' && str[7] == '/') ||
			(str[4] == ':' && str[5] == '/' && str[6] == '/'))
}