fsrefstore.go 7.96 KB
Newer Older
1 2 3 4 5 6
package filestore

import (
	"context"
	"fmt"
	"io"
Jakub Sztandera's avatar
Jakub Sztandera committed
7
	"net/http"
8 9 10 11 12
	"os"
	"path/filepath"

	pb "github.com/ipfs/go-ipfs/filestore/pb"

13
	dshelp "gx/ipfs/QmNP2u7bofwUQptHQGPfabGWtTCbxhNLSZKqbf1uzsup9V/go-ipfs-ds-help"
14
	proto "gx/ipfs/QmT6n4mspWYEya864BhCUJEgyxiRfmiSY9ruQwTUNpRKaM/protobuf/proto"
15 16
	blocks "gx/ipfs/QmTRCUvZLiir12Qr6MV3HKfKMHX8Nf1Vddn6t2g5nsQSb9/go-block-format"
	cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
Jakub Sztandera's avatar
Jakub Sztandera committed
17
	posinfo "gx/ipfs/QmdGSfmN4wWNXVs2XiwHbpjnUikJ7HyrTJNHyYGdodyJDC/go-ipfs-posinfo"
Steven Allen's avatar
Steven Allen committed
18
	blockstore "gx/ipfs/QmdpuJBPBZ6sLPj9BQpn3Rpi38BT2cF1QMiUfyzNWeySW4/go-ipfs-blockstore"
Steven Allen's avatar
Steven Allen committed
19 20 21
	ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
	dsns "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/namespace"
	dsq "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/query"
22 23
)

Hector Sanjuan's avatar
Hector Sanjuan committed
24
// FilestorePrefix identifies the key prefix for FileManager blocks.
25 26
var FilestorePrefix = ds.NewKey("filestore")

Hector Sanjuan's avatar
Hector Sanjuan committed
27 28 29 30
// FileManager is a blockstore implementation which stores special
// blocks FilestoreNode type. These nodes only contain a reference
// to the actual location of the block data in the filesystem
// (a path and an offset).
31 32 33 34 35
type FileManager struct {
	ds   ds.Batching
	root string
}

Hector Sanjuan's avatar
Hector Sanjuan committed
36 37 38 39
// CorruptReferenceError implements the error interface.
// It is used to indicate that the block contents pointed
// by the referencing blocks cannot be retrieved (i.e. the
// file is not found, or the data changed as it was being read).
40
type CorruptReferenceError struct {
41
	Code Status
42
	Err  error
43 44
}

Hector Sanjuan's avatar
Hector Sanjuan committed
45 46
// Error() returns the error message in the CorruptReferenceError
// as a string.
47 48 49 50
func (c CorruptReferenceError) Error() string {
	return c.Err.Error()
}

Hector Sanjuan's avatar
Hector Sanjuan committed
51 52 53
// NewFileManager initializes a new file manager with the given
// datastore and root. All FilestoreNodes paths are relative to the
// root path given here, which is prepended for any operations.
54 55 56 57
func NewFileManager(ds ds.Batching, root string) *FileManager {
	return &FileManager{dsns.Wrap(ds, FilestorePrefix), root}
}

Hector Sanjuan's avatar
Hector Sanjuan committed
58 59 60
// AllKeysChan returns a channel from which to read the keys stored in
// the FileManager. If the given context is cancelled the channel will be
// closed.
61 62 63 64 65 66 67 68
func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
	q := dsq.Query{KeysOnly: true}

	res, err := f.ds.Query(q)
	if err != nil {
		return nil, err
	}

69
	out := make(chan *cid.Cid, dsq.KeysOnlyBufSize)
70 71 72 73 74 75 76 77 78 79 80
	go func() {
		defer close(out)
		for {
			v, ok := res.NextSync()
			if !ok {
				return
			}

			k := ds.RawKey(v.Key)
			c, err := dshelp.DsKeyToCid(k)
			if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
81
				log.Errorf("decoding cid from filestore: %s", err)
82 83 84 85 86 87 88 89 90 91 92 93 94 95
				continue
			}

			select {
			case out <- c:
			case <-ctx.Done():
				return
			}
		}
	}()

	return out, nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
96 97
// DeleteBlock deletes the reference-block from the underlying
// datastore. It does not touch the referenced data.
98 99 100 101 102 103 104 105
func (f *FileManager) DeleteBlock(c *cid.Cid) error {
	err := f.ds.Delete(dshelp.CidToDsKey(c))
	if err == ds.ErrNotFound {
		return blockstore.ErrNotFound
	}
	return err
}

Hector Sanjuan's avatar
Hector Sanjuan committed
106 107 108 109
// Get reads a block from the datastore. Reading a block
// is done in two steps: the first step retrieves the reference
// block from the datastore. The second step uses the stored
// path and offsets to read the raw block data directly from disk.
110
func (f *FileManager) Get(c *cid.Cid) (blocks.Block, error) {
Kevin Atkinson's avatar
Kevin Atkinson committed
111 112 113 114 115 116 117 118 119 120 121 122
	dobj, err := f.getDataObj(c)
	if err != nil {
		return nil, err
	}
	out, err := f.readDataObj(c, dobj)
	if err != nil {
		return nil, err
	}

	return blocks.NewBlockWithCid(out, c)
}

Jakub Sztandera's avatar
Jakub Sztandera committed
123 124 125 126 127 128 129 130
func (f *FileManager) readDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
	if !d.GetURL() {
		return f.readFileDataObj(c, d)
	} else {
		return f.readURLDataObj(c, d)
	}
}

Kevin Atkinson's avatar
Kevin Atkinson committed
131
func (f *FileManager) getDataObj(c *cid.Cid) (*pb.DataObj, error) {
132 133 134 135 136 137 138 139 140 141
	o, err := f.ds.Get(dshelp.CidToDsKey(c))
	switch err {
	case ds.ErrNotFound:
		return nil, blockstore.ErrNotFound
	default:
		return nil, err
	case nil:
		//
	}

Kevin Atkinson's avatar
Kevin Atkinson committed
142 143 144 145
	return unmarshalDataObj(o)
}

func unmarshalDataObj(o interface{}) (*pb.DataObj, error) {
146 147 148 149 150 151 152 153 154 155
	data, ok := o.([]byte)
	if !ok {
		return nil, fmt.Errorf("stored filestore dataobj was not a []byte")
	}

	var dobj pb.DataObj
	if err := proto.Unmarshal(data, &dobj); err != nil {
		return nil, err
	}

Kevin Atkinson's avatar
Kevin Atkinson committed
156
	return &dobj, nil
157 158
}

Jakub Sztandera's avatar
Jakub Sztandera committed
159
func (f *FileManager) readFileDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
Jeromy's avatar
Jeromy committed
160 161
	p := filepath.FromSlash(d.GetFilePath())
	abspath := filepath.Join(f.root, p)
162 163

	fi, err := os.Open(abspath)
164
	if os.IsNotExist(err) {
165
		return nil, &CorruptReferenceError{StatusFileNotFound, err}
166
	} else if err != nil {
167
		return nil, &CorruptReferenceError{StatusFileError, err}
168 169 170
	}
	defer fi.Close()

171
	_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
172
	if err != nil {
173
		return nil, &CorruptReferenceError{StatusFileError, err}
174 175 176 177
	}

	outbuf := make([]byte, d.GetSize_())
	_, err = io.ReadFull(fi, outbuf)
178
	if err == io.EOF || err == io.ErrUnexpectedEOF {
179
		return nil, &CorruptReferenceError{StatusFileChanged, err}
180
	} else if err != nil {
181
		return nil, &CorruptReferenceError{StatusFileError, err}
182 183
	}

184 185 186 187 188 189
	outcid, err := c.Prefix().Sum(outbuf)
	if err != nil {
		return nil, err
	}

	if !c.Equals(outcid) {
190
		return nil, &CorruptReferenceError{StatusFileChanged,
191
			fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
192 193
	}

194 195 196
	return outbuf, nil
}

Jakub Sztandera's avatar
Jakub Sztandera committed
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
// reads and verifies the block from URL
func (f *FileManager) readURLDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {

	req, err := http.NewRequest("GET", d.GetFilePath(), nil)
	if err != nil {
		return nil, err
	}

	req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", d.GetOffset(), d.GetOffset()+d.GetSize_()-1))

	res, err := http.DefaultClient.Do(req)
	if err != nil {
		return nil, err
	}
	if res.StatusCode != http.StatusPartialContent {
		return nil, fmt.Errorf("expected HTTP 206 got %d", res.StatusCode)
	}

	outbuf := make([]byte, d.GetSize_())
	_, err = io.ReadFull(res.Body, outbuf)
	if err == io.EOF || err == io.ErrUnexpectedEOF {
		return nil, &CorruptReferenceError{StatusFileChanged, err}
	} else if err != nil {
		return nil, &CorruptReferenceError{StatusFileError, err}
	}
	res.Body.Close()

	outcid, err := c.Prefix().Sum(outbuf)
	if err != nil {
		return nil, err
	}

	if !c.Equals(outcid) {
		return nil, &CorruptReferenceError{StatusFileChanged,
			fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
	}

	return outbuf, nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
237 238
// Has returns if the FileManager is storing a block reference. It does not
// validate the data, nor checks if the reference is valid.
239 240 241 242 243 244 245
func (f *FileManager) Has(c *cid.Cid) (bool, error) {
	// NOTE: interesting thing to consider. Has doesnt validate the data.
	// So the data on disk could be invalid, and we could think we have it.
	dsk := dshelp.CidToDsKey(c)
	return f.ds.Has(dsk)
}

246 247 248 249
type putter interface {
	Put(ds.Key, interface{}) error
}

Hector Sanjuan's avatar
Hector Sanjuan committed
250 251
// Put adds a new reference block to the FileManager. It does not check
// that the reference is valid.
252
func (f *FileManager) Put(b *posinfo.FilestoreNode) error {
253 254 255 256
	return f.putTo(b, f.ds)
}

func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
257 258
	var dobj pb.DataObj

Jakub Sztandera's avatar
Jakub Sztandera committed
259 260 261 262
	if !b.PosInfo.IsURL {
		if !filepath.HasPrefix(b.PosInfo.FullPath, f.root) {
			return fmt.Errorf("cannot add filestore references outside ipfs root (%s)", f.root)
		}
263

Jakub Sztandera's avatar
Jakub Sztandera committed
264 265 266 267
		p, err := filepath.Rel(f.root, b.PosInfo.FullPath)
		if err != nil {
			return err
		}
268

Jakub Sztandera's avatar
Jakub Sztandera committed
269 270 271 272 273
		dobj.FilePath = proto.String(filepath.ToSlash(p))
	} else {
		dobj.FilePath = proto.String(b.PosInfo.FullPath)
		dobj.URL = proto.Bool(true)
	}
274 275 276 277 278 279 280 281
	dobj.Offset = proto.Uint64(b.PosInfo.Offset)
	dobj.Size_ = proto.Uint64(uint64(len(b.RawData())))

	data, err := proto.Marshal(&dobj)
	if err != nil {
		return err
	}

282
	return to.Put(dshelp.CidToDsKey(b.Cid()), data)
283 284
}

Hector Sanjuan's avatar
Hector Sanjuan committed
285 286
// PutMany is like Put() but takes a slice of blocks instead,
// allowing it to create a batch transaction.
287
func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error {
288 289 290 291 292
	batch, err := f.ds.Batch()
	if err != nil {
		return err
	}

293
	for _, b := range bs {
294
		if err := f.putTo(b, batch); err != nil {
295 296 297
			return err
		}
	}
298 299

	return batch.Commit()
300
}