fsrefstore.go 8.63 KB
Newer Older
1 2 3 4 5 6
package filestore

import (
	"context"
	"fmt"
	"io"
Jakub Sztandera's avatar
Jakub Sztandera committed
7
	"net/http"
8 9 10 11 12
	"os"
	"path/filepath"

	pb "github.com/ipfs/go-ipfs/filestore/pb"

13
	dshelp "gx/ipfs/QmNP2u7bofwUQptHQGPfabGWtTCbxhNLSZKqbf1uzsup9V/go-ipfs-ds-help"
14
	proto "gx/ipfs/QmT6n4mspWYEya864BhCUJEgyxiRfmiSY9ruQwTUNpRKaM/protobuf/proto"
15
	blocks "gx/ipfs/QmTRCUvZLiir12Qr6MV3HKfKMHX8Nf1Vddn6t2g5nsQSb9/go-block-format"
16
	posinfo "gx/ipfs/QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe/go-ipfs-posinfo"
17
	cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
Steven Allen's avatar
Steven Allen committed
18
	blockstore "gx/ipfs/QmdpuJBPBZ6sLPj9BQpn3Rpi38BT2cF1QMiUfyzNWeySW4/go-ipfs-blockstore"
Steven Allen's avatar
Steven Allen committed
19 20 21
	ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
	dsns "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/namespace"
	dsq "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/query"
22 23
)

Hector Sanjuan's avatar
Hector Sanjuan committed
24
// FilestorePrefix identifies the key prefix for FileManager blocks.
25 26
var FilestorePrefix = ds.NewKey("filestore")

Hector Sanjuan's avatar
Hector Sanjuan committed
27 28 29 30
// FileManager is a blockstore implementation which stores special
// blocks FilestoreNode type. These nodes only contain a reference
// to the actual location of the block data in the filesystem
// (a path and an offset).
31
type FileManager struct {
32 33 34 35
	AllowFiles bool
	AllowUrls  bool
	ds         ds.Batching
	root       string
36 37
}

Hector Sanjuan's avatar
Hector Sanjuan committed
38 39 40 41
// CorruptReferenceError implements the error interface.
// It is used to indicate that the block contents pointed
// by the referencing blocks cannot be retrieved (i.e. the
// file is not found, or the data changed as it was being read).
42
type CorruptReferenceError struct {
43
	Code Status
44
	Err  error
45 46
}

Hector Sanjuan's avatar
Hector Sanjuan committed
47 48
// Error() returns the error message in the CorruptReferenceError
// as a string.
49 50 51 52
func (c CorruptReferenceError) Error() string {
	return c.Err.Error()
}

Hector Sanjuan's avatar
Hector Sanjuan committed
53 54 55
// NewFileManager initializes a new file manager with the given
// datastore and root. All FilestoreNodes paths are relative to the
// root path given here, which is prepended for any operations.
56
func NewFileManager(ds ds.Batching, root string) *FileManager {
57
	return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root}
58 59
}

Hector Sanjuan's avatar
Hector Sanjuan committed
60 61 62
// AllKeysChan returns a channel from which to read the keys stored in
// the FileManager. If the given context is cancelled the channel will be
// closed.
63 64 65 66 67 68 69 70
func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
	q := dsq.Query{KeysOnly: true}

	res, err := f.ds.Query(q)
	if err != nil {
		return nil, err
	}

71
	out := make(chan *cid.Cid, dsq.KeysOnlyBufSize)
72 73 74 75 76 77 78 79 80 81 82
	go func() {
		defer close(out)
		for {
			v, ok := res.NextSync()
			if !ok {
				return
			}

			k := ds.RawKey(v.Key)
			c, err := dshelp.DsKeyToCid(k)
			if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
83
				log.Errorf("decoding cid from filestore: %s", err)
84 85 86 87 88 89 90 91 92 93 94 95 96 97
				continue
			}

			select {
			case out <- c:
			case <-ctx.Done():
				return
			}
		}
	}()

	return out, nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
98 99
// DeleteBlock deletes the reference-block from the underlying
// datastore. It does not touch the referenced data.
100 101 102 103 104 105 106 107
func (f *FileManager) DeleteBlock(c *cid.Cid) error {
	err := f.ds.Delete(dshelp.CidToDsKey(c))
	if err == ds.ErrNotFound {
		return blockstore.ErrNotFound
	}
	return err
}

Hector Sanjuan's avatar
Hector Sanjuan committed
108 109 110 111
// Get reads a block from the datastore. Reading a block
// is done in two steps: the first step retrieves the reference
// block from the datastore. The second step uses the stored
// path and offsets to read the raw block data directly from disk.
112
func (f *FileManager) Get(c *cid.Cid) (blocks.Block, error) {
Kevin Atkinson's avatar
Kevin Atkinson committed
113 114 115 116 117 118 119 120 121 122 123 124
	dobj, err := f.getDataObj(c)
	if err != nil {
		return nil, err
	}
	out, err := f.readDataObj(c, dobj)
	if err != nil {
		return nil, err
	}

	return blocks.NewBlockWithCid(out, c)
}

Jakub Sztandera's avatar
Jakub Sztandera committed
125
func (f *FileManager) readDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
126
	if !IsURL(d.GetFilePath()) {
Jakub Sztandera's avatar
Jakub Sztandera committed
127 128 129 130 131 132
		return f.readFileDataObj(c, d)
	} else {
		return f.readURLDataObj(c, d)
	}
}

Kevin Atkinson's avatar
Kevin Atkinson committed
133
func (f *FileManager) getDataObj(c *cid.Cid) (*pb.DataObj, error) {
134 135 136 137 138 139 140 141 142 143
	o, err := f.ds.Get(dshelp.CidToDsKey(c))
	switch err {
	case ds.ErrNotFound:
		return nil, blockstore.ErrNotFound
	default:
		return nil, err
	case nil:
		//
	}

Kevin Atkinson's avatar
Kevin Atkinson committed
144 145 146 147
	return unmarshalDataObj(o)
}

func unmarshalDataObj(o interface{}) (*pb.DataObj, error) {
148 149 150 151 152 153 154 155 156 157
	data, ok := o.([]byte)
	if !ok {
		return nil, fmt.Errorf("stored filestore dataobj was not a []byte")
	}

	var dobj pb.DataObj
	if err := proto.Unmarshal(data, &dobj); err != nil {
		return nil, err
	}

Kevin Atkinson's avatar
Kevin Atkinson committed
158
	return &dobj, nil
159 160
}

Jakub Sztandera's avatar
Jakub Sztandera committed
161
func (f *FileManager) readFileDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
162 163 164 165
	if !f.AllowFiles {
		return nil, fmt.Errorf("filestore not enabled")
	}

Jeromy's avatar
Jeromy committed
166 167
	p := filepath.FromSlash(d.GetFilePath())
	abspath := filepath.Join(f.root, p)
168 169

	fi, err := os.Open(abspath)
170
	if os.IsNotExist(err) {
171
		return nil, &CorruptReferenceError{StatusFileNotFound, err}
172
	} else if err != nil {
173
		return nil, &CorruptReferenceError{StatusFileError, err}
174 175 176
	}
	defer fi.Close()

177
	_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
178
	if err != nil {
179
		return nil, &CorruptReferenceError{StatusFileError, err}
180 181 182 183
	}

	outbuf := make([]byte, d.GetSize_())
	_, err = io.ReadFull(fi, outbuf)
184
	if err == io.EOF || err == io.ErrUnexpectedEOF {
185
		return nil, &CorruptReferenceError{StatusFileChanged, err}
186
	} else if err != nil {
187
		return nil, &CorruptReferenceError{StatusFileError, err}
188 189
	}

190 191 192 193 194 195
	outcid, err := c.Prefix().Sum(outbuf)
	if err != nil {
		return nil, err
	}

	if !c.Equals(outcid) {
196
		return nil, &CorruptReferenceError{StatusFileChanged,
197
			fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
198 199
	}

200 201 202
	return outbuf, nil
}

Jakub Sztandera's avatar
Jakub Sztandera committed
203 204
// reads and verifies the block from URL
func (f *FileManager) readURLDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
205 206 207
	if !f.AllowUrls {
		return nil, fmt.Errorf("urlstore not enabled")
	}
Jakub Sztandera's avatar
Jakub Sztandera committed
208 209 210 211 212 213 214 215 216 217

	req, err := http.NewRequest("GET", d.GetFilePath(), nil)
	if err != nil {
		return nil, err
	}

	req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", d.GetOffset(), d.GetOffset()+d.GetSize_()-1))

	res, err := http.DefaultClient.Do(req)
	if err != nil {
218
		return nil, &CorruptReferenceError{StatusFileError, err}
Jakub Sztandera's avatar
Jakub Sztandera committed
219 220
	}
	if res.StatusCode != http.StatusPartialContent {
221 222
		return nil, &CorruptReferenceError{StatusFileError,
			fmt.Errorf("expected HTTP 206 got %d", res.StatusCode)}
Jakub Sztandera's avatar
Jakub Sztandera committed
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
	}

	outbuf := make([]byte, d.GetSize_())
	_, err = io.ReadFull(res.Body, outbuf)
	if err == io.EOF || err == io.ErrUnexpectedEOF {
		return nil, &CorruptReferenceError{StatusFileChanged, err}
	} else if err != nil {
		return nil, &CorruptReferenceError{StatusFileError, err}
	}
	res.Body.Close()

	outcid, err := c.Prefix().Sum(outbuf)
	if err != nil {
		return nil, err
	}

	if !c.Equals(outcid) {
		return nil, &CorruptReferenceError{StatusFileChanged,
			fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
	}

	return outbuf, nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
247 248
// Has returns if the FileManager is storing a block reference. It does not
// validate the data, nor checks if the reference is valid.
249 250 251 252 253 254 255
func (f *FileManager) Has(c *cid.Cid) (bool, error) {
	// NOTE: interesting thing to consider. Has doesnt validate the data.
	// So the data on disk could be invalid, and we could think we have it.
	dsk := dshelp.CidToDsKey(c)
	return f.ds.Has(dsk)
}

256 257 258 259
type putter interface {
	Put(ds.Key, interface{}) error
}

Hector Sanjuan's avatar
Hector Sanjuan committed
260 261
// Put adds a new reference block to the FileManager. It does not check
// that the reference is valid.
262
func (f *FileManager) Put(b *posinfo.FilestoreNode) error {
263 264 265 266
	return f.putTo(b, f.ds)
}

func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
267 268
	var dobj pb.DataObj

269
	if !IsURL(b.PosInfo.FullPath) {
270 271 272
		if !f.AllowFiles {
			return fmt.Errorf("filestore not enabled")
		}
Jakub Sztandera's avatar
Jakub Sztandera committed
273 274 275
		if !filepath.HasPrefix(b.PosInfo.FullPath, f.root) {
			return fmt.Errorf("cannot add filestore references outside ipfs root (%s)", f.root)
		}
276

Jakub Sztandera's avatar
Jakub Sztandera committed
277 278 279 280
		p, err := filepath.Rel(f.root, b.PosInfo.FullPath)
		if err != nil {
			return err
		}
281

Jakub Sztandera's avatar
Jakub Sztandera committed
282 283
		dobj.FilePath = proto.String(filepath.ToSlash(p))
	} else {
284 285 286
		if !f.AllowUrls {
			return fmt.Errorf("urlstore not enabled")
		}
Jakub Sztandera's avatar
Jakub Sztandera committed
287 288
		dobj.FilePath = proto.String(b.PosInfo.FullPath)
	}
289 290 291 292 293 294 295 296
	dobj.Offset = proto.Uint64(b.PosInfo.Offset)
	dobj.Size_ = proto.Uint64(uint64(len(b.RawData())))

	data, err := proto.Marshal(&dobj)
	if err != nil {
		return err
	}

297
	return to.Put(dshelp.CidToDsKey(b.Cid()), data)
298 299
}

Hector Sanjuan's avatar
Hector Sanjuan committed
300 301
// PutMany is like Put() but takes a slice of blocks instead,
// allowing it to create a batch transaction.
302
func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error {
303 304 305 306 307
	batch, err := f.ds.Batch()
	if err != nil {
		return err
	}

308
	for _, b := range bs {
309
		if err := f.putTo(b, batch); err != nil {
310 311 312
			return err
		}
	}
313 314

	return batch.Commit()
315
}
316 317 318 319 320 321

func IsURL(str string) bool {
	return (len(str) > 7 && str[0] == 'h' && str[1] == 't' && str[2] == 't' && str[3] == 'p') &&
		((len(str) > 8 && str[4] == 's' && str[5] == ':' && str[6] == '/' && str[7] == '/') ||
			(str[4] == ':' && str[5] == '/' && str[6] == '/'))
}