fsrefstore.go 6.61 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
package filestore

import (
	"context"
	"fmt"
	"io"
	"os"
	"path/filepath"

	pb "github.com/ipfs/go-ipfs/filestore/pb"

	proto "gx/ipfs/QmT6n4mspWYEya864BhCUJEgyxiRfmiSY9ruQwTUNpRKaM/protobuf/proto"
Steven Allen's avatar
Steven Allen committed
13 14
	dshelp "gx/ipfs/QmYJgz1Z5PbBGP7n2XA8uv5sF1EKLfYUjL7kFemVAjMNqC/go-ipfs-ds-help"
	blockstore "gx/ipfs/QmayRSLCiM2gWR7Kay8vqu3Yy5mf7yPqocF9ZRgDUPYMcc/go-ipfs-blockstore"
15
	posinfo "gx/ipfs/Qmb3jLEFAQrqdVgWUajqEyuuDoavkSq1XQXz6tWdFWF995/go-ipfs-posinfo"
Steven Allen's avatar
Steven Allen committed
16
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
Steven Allen's avatar
Steven Allen committed
17 18 19
	ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
	dsns "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/namespace"
	dsq "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/query"
Hector Sanjuan's avatar
Hector Sanjuan committed
20
	blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
21 22
)

Hector Sanjuan's avatar
Hector Sanjuan committed
23
// FilestorePrefix identifies the key prefix for FileManager blocks.
24 25
var FilestorePrefix = ds.NewKey("filestore")

Hector Sanjuan's avatar
Hector Sanjuan committed
26 27 28 29
// FileManager is a blockstore implementation which stores special
// blocks FilestoreNode type. These nodes only contain a reference
// to the actual location of the block data in the filesystem
// (a path and an offset).
30 31 32 33 34
type FileManager struct {
	ds   ds.Batching
	root string
}

Hector Sanjuan's avatar
Hector Sanjuan committed
35 36 37 38
// CorruptReferenceError implements the error interface.
// It is used to indicate that the block contents pointed
// by the referencing blocks cannot be retrieved (i.e. the
// file is not found, or the data changed as it was being read).
39
type CorruptReferenceError struct {
40
	Code Status
41
	Err  error
42 43
}

Hector Sanjuan's avatar
Hector Sanjuan committed
44 45
// Error() returns the error message in the CorruptReferenceError
// as a string.
46 47 48 49
func (c CorruptReferenceError) Error() string {
	return c.Err.Error()
}

Hector Sanjuan's avatar
Hector Sanjuan committed
50 51 52
// NewFileManager initializes a new file manager with the given
// datastore and root. All FilestoreNodes paths are relative to the
// root path given here, which is prepended for any operations.
53 54 55 56
func NewFileManager(ds ds.Batching, root string) *FileManager {
	return &FileManager{dsns.Wrap(ds, FilestorePrefix), root}
}

Hector Sanjuan's avatar
Hector Sanjuan committed
57 58 59
// AllKeysChan returns a channel from which to read the keys stored in
// the FileManager. If the given context is cancelled the channel will be
// closed.
60 61 62 63 64 65 66 67
func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
	q := dsq.Query{KeysOnly: true}

	res, err := f.ds.Query(q)
	if err != nil {
		return nil, err
	}

68
	out := make(chan *cid.Cid, dsq.KeysOnlyBufSize)
69 70 71 72 73 74 75 76 77 78 79
	go func() {
		defer close(out)
		for {
			v, ok := res.NextSync()
			if !ok {
				return
			}

			k := ds.RawKey(v.Key)
			c, err := dshelp.DsKeyToCid(k)
			if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
80
				log.Errorf("decoding cid from filestore: %s", err)
81 82 83 84 85 86 87 88 89 90 91 92 93 94
				continue
			}

			select {
			case out <- c:
			case <-ctx.Done():
				return
			}
		}
	}()

	return out, nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
95 96
// DeleteBlock deletes the reference-block from the underlying
// datastore. It does not touch the referenced data.
97 98 99 100 101 102 103 104
func (f *FileManager) DeleteBlock(c *cid.Cid) error {
	err := f.ds.Delete(dshelp.CidToDsKey(c))
	if err == ds.ErrNotFound {
		return blockstore.ErrNotFound
	}
	return err
}

Hector Sanjuan's avatar
Hector Sanjuan committed
105 106 107 108
// Get reads a block from the datastore. Reading a block
// is done in two steps: the first step retrieves the reference
// block from the datastore. The second step uses the stored
// path and offsets to read the raw block data directly from disk.
109
func (f *FileManager) Get(c *cid.Cid) (blocks.Block, error) {
Kevin Atkinson's avatar
Kevin Atkinson committed
110 111 112 113 114 115 116 117 118 119 120 121 122 123
	dobj, err := f.getDataObj(c)
	if err != nil {
		return nil, err
	}

	out, err := f.readDataObj(c, dobj)
	if err != nil {
		return nil, err
	}

	return blocks.NewBlockWithCid(out, c)
}

func (f *FileManager) getDataObj(c *cid.Cid) (*pb.DataObj, error) {
124 125 126 127 128 129 130 131 132 133
	o, err := f.ds.Get(dshelp.CidToDsKey(c))
	switch err {
	case ds.ErrNotFound:
		return nil, blockstore.ErrNotFound
	default:
		return nil, err
	case nil:
		//
	}

Kevin Atkinson's avatar
Kevin Atkinson committed
134 135 136 137
	return unmarshalDataObj(o)
}

func unmarshalDataObj(o interface{}) (*pb.DataObj, error) {
138 139 140 141 142 143 144 145 146 147
	data, ok := o.([]byte)
	if !ok {
		return nil, fmt.Errorf("stored filestore dataobj was not a []byte")
	}

	var dobj pb.DataObj
	if err := proto.Unmarshal(data, &dobj); err != nil {
		return nil, err
	}

Kevin Atkinson's avatar
Kevin Atkinson committed
148
	return &dobj, nil
149 150
}

151 152
// reads and verifies the block
func (f *FileManager) readDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
Jeromy's avatar
Jeromy committed
153 154
	p := filepath.FromSlash(d.GetFilePath())
	abspath := filepath.Join(f.root, p)
155 156

	fi, err := os.Open(abspath)
157
	if os.IsNotExist(err) {
158
		return nil, &CorruptReferenceError{StatusFileNotFound, err}
159
	} else if err != nil {
160
		return nil, &CorruptReferenceError{StatusFileError, err}
161 162 163
	}
	defer fi.Close()

164
	_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
165
	if err != nil {
166
		return nil, &CorruptReferenceError{StatusFileError, err}
167 168 169 170
	}

	outbuf := make([]byte, d.GetSize_())
	_, err = io.ReadFull(fi, outbuf)
171
	if err == io.EOF || err == io.ErrUnexpectedEOF {
172
		return nil, &CorruptReferenceError{StatusFileChanged, err}
173
	} else if err != nil {
174
		return nil, &CorruptReferenceError{StatusFileError, err}
175 176
	}

177 178 179 180 181 182
	outcid, err := c.Prefix().Sum(outbuf)
	if err != nil {
		return nil, err
	}

	if !c.Equals(outcid) {
183
		return nil, &CorruptReferenceError{StatusFileChanged,
184
			fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
185 186
	}

187 188 189
	return outbuf, nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
190 191
// Has returns if the FileManager is storing a block reference. It does not
// validate the data, nor checks if the reference is valid.
192 193 194 195 196 197 198
func (f *FileManager) Has(c *cid.Cid) (bool, error) {
	// NOTE: interesting thing to consider. Has doesnt validate the data.
	// So the data on disk could be invalid, and we could think we have it.
	dsk := dshelp.CidToDsKey(c)
	return f.ds.Has(dsk)
}

199 200 201 202
type putter interface {
	Put(ds.Key, interface{}) error
}

Hector Sanjuan's avatar
Hector Sanjuan committed
203 204
// Put adds a new reference block to the FileManager. It does not check
// that the reference is valid.
205
func (f *FileManager) Put(b *posinfo.FilestoreNode) error {
206 207 208 209
	return f.putTo(b, f.ds)
}

func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
210 211 212
	var dobj pb.DataObj

	if !filepath.HasPrefix(b.PosInfo.FullPath, f.root) {
213
		return fmt.Errorf("cannot add filestore references outside ipfs root (%s)", f.root)
214 215 216 217 218 219 220
	}

	p, err := filepath.Rel(f.root, b.PosInfo.FullPath)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
221
	dobj.FilePath = proto.String(filepath.ToSlash(p))
222 223 224 225 226 227 228 229
	dobj.Offset = proto.Uint64(b.PosInfo.Offset)
	dobj.Size_ = proto.Uint64(uint64(len(b.RawData())))

	data, err := proto.Marshal(&dobj)
	if err != nil {
		return err
	}

230
	return to.Put(dshelp.CidToDsKey(b.Cid()), data)
231 232
}

Hector Sanjuan's avatar
Hector Sanjuan committed
233 234
// PutMany is like Put() but takes a slice of blocks instead,
// allowing it to create a batch transaction.
235
func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error {
236 237 238 239 240
	batch, err := f.ds.Batch()
	if err != nil {
		return err
	}

241
	for _, b := range bs {
242
		if err := f.putTo(b, batch); err != nil {
243 244 245
			return err
		}
	}
246 247

	return batch.Commit()
248
}