fsrefstore.go 4.25 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
package filestore

import (
	"context"
	"fmt"
	"io"
	"os"
	"path/filepath"

	"github.com/ipfs/go-ipfs/blocks"
	"github.com/ipfs/go-ipfs/blocks/blockstore"
	pb "github.com/ipfs/go-ipfs/filestore/pb"
	dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
	posinfo "github.com/ipfs/go-ipfs/thirdparty/posinfo"

	ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
	dsns "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/namespace"
	dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"
	proto "gx/ipfs/QmT6n4mspWYEya864BhCUJEgyxiRfmiSY9ruQwTUNpRKaM/protobuf/proto"
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
)

var FilestorePrefix = ds.NewKey("filestore")

type FileManager struct {
	ds   ds.Batching
	root string
}

type CorruptReferenceError struct {
	Err error
}

func (c CorruptReferenceError) Error() string {
	return c.Err.Error()
}

func NewFileManager(ds ds.Batching, root string) *FileManager {
	return &FileManager{dsns.Wrap(ds, FilestorePrefix), root}
}

func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
	q := dsq.Query{KeysOnly: true}
	q.Prefix = FilestorePrefix.String()

	res, err := f.ds.Query(q)
	if err != nil {
		return nil, err
	}

	out := make(chan *cid.Cid)
	go func() {
		defer close(out)
		for {
			v, ok := res.NextSync()
			if !ok {
				return
			}

			k := ds.RawKey(v.Key)
			c, err := dshelp.DsKeyToCid(k)
			if err != nil {
				log.Error("decoding cid from filestore: %s", err)
				continue
			}

			select {
			case out <- c:
			case <-ctx.Done():
				return
			}
		}
	}()

	return out, nil
}

func (f *FileManager) DeleteBlock(c *cid.Cid) error {
	err := f.ds.Delete(dshelp.CidToDsKey(c))
	if err == ds.ErrNotFound {
		return blockstore.ErrNotFound
	}
	return err
}

func (f *FileManager) Get(c *cid.Cid) (blocks.Block, error) {
	o, err := f.ds.Get(dshelp.CidToDsKey(c))
	switch err {
	case ds.ErrNotFound:
		return nil, blockstore.ErrNotFound
	default:
		return nil, err
	case nil:
		//
	}

	data, ok := o.([]byte)
	if !ok {
		return nil, fmt.Errorf("stored filestore dataobj was not a []byte")
	}

	var dobj pb.DataObj
	if err := proto.Unmarshal(data, &dobj); err != nil {
		return nil, err
	}

	out, err := f.readDataObj(&dobj)
	if err != nil {
		return nil, err
	}

Jeromy's avatar
Jeromy committed
112 113 114 115 116 117 118 119 120
	outcid, err := c.Prefix().Sum(out)
	if err != nil {
		return nil, err
	}

	if !c.Equals(outcid) {
		return nil, &CorruptReferenceError{fmt.Errorf("data in file did not match. %s offset %d", dobj.GetFilePath(), dobj.GetOffset())}
	}

121 122 123 124
	return blocks.NewBlockWithCid(out, c)
}

func (f *FileManager) readDataObj(d *pb.DataObj) ([]byte, error) {
Jeromy's avatar
Jeromy committed
125 126
	p := filepath.FromSlash(d.GetFilePath())
	abspath := filepath.Join(f.root, p)
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154

	fi, err := os.Open(abspath)
	if err != nil {
		return nil, &CorruptReferenceError{err}
	}
	defer fi.Close()

	_, err = fi.Seek(int64(d.GetOffset()), os.SEEK_SET)
	if err != nil {
		return nil, &CorruptReferenceError{err}
	}

	outbuf := make([]byte, d.GetSize_())
	_, err = io.ReadFull(fi, outbuf)
	if err != nil {
		return nil, &CorruptReferenceError{err}
	}

	return outbuf, nil
}

func (f *FileManager) Has(c *cid.Cid) (bool, error) {
	// NOTE: interesting thing to consider. Has doesnt validate the data.
	// So the data on disk could be invalid, and we could think we have it.
	dsk := dshelp.CidToDsKey(c)
	return f.ds.Has(dsk)
}

155 156 157 158
type putter interface {
	Put(ds.Key, interface{}) error
}

159
func (f *FileManager) Put(b *posinfo.FilestoreNode) error {
160 161 162 163
	return f.putTo(b, f.ds)
}

func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error {
164 165 166 167 168 169 170 171 172 173 174
	var dobj pb.DataObj

	if !filepath.HasPrefix(b.PosInfo.FullPath, f.root) {
		return fmt.Errorf("cannot add filestore references outside ipfs root")
	}

	p, err := filepath.Rel(f.root, b.PosInfo.FullPath)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
175
	dobj.FilePath = proto.String(filepath.ToSlash(p))
176 177 178 179 180 181 182 183
	dobj.Offset = proto.Uint64(b.PosInfo.Offset)
	dobj.Size_ = proto.Uint64(uint64(len(b.RawData())))

	data, err := proto.Marshal(&dobj)
	if err != nil {
		return err
	}

184
	return to.Put(dshelp.CidToDsKey(b.Cid()), data)
185 186 187
}

func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error {
188 189 190 191 192
	batch, err := f.ds.Batch()
	if err != nil {
		return err
	}

193
	for _, b := range bs {
194
		if err := f.putTo(b, batch); err != nil {
195 196 197
			return err
		}
	}
198 199

	return batch.Commit()
200
}