flatfs.go 7.51 KB
Newer Older
Tommi Virtanen's avatar
Tommi Virtanen committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Package flatfs is a Datastore implementation that stores all
// objects in a two-level directory structure in the local file
// system, regardless of the hierarchy of the keys.
package flatfs

import (
	"encoding/hex"
	"errors"
	"io/ioutil"
	"os"
	"path"
	"strings"

	"github.com/jbenet/go-datastore"
15
	"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/go-os-rename"
Tommi Virtanen's avatar
Tommi Virtanen committed
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
	"github.com/jbenet/go-datastore/query"
)

const (
	extension    = ".data"
	maxPrefixLen = 16
)

var (
	ErrBadPrefixLen = errors.New("bad prefix length")
)

type Datastore struct {
	path string
	// length of the dir splay prefix, in bytes of hex digits
	hexPrefixLen int
}

var _ datastore.Datastore = (*Datastore)(nil)

func New(path string, prefixLen int) (*Datastore, error) {
	if prefixLen <= 0 || prefixLen > maxPrefixLen {
		return nil, ErrBadPrefixLen
	}
	fs := &Datastore{
		path: path,
		// convert from binary bytes to bytes of hex encoding
		hexPrefixLen: prefixLen * hex.EncodedLen(1),
	}
	return fs, nil
}

var padding = strings.Repeat("_", maxPrefixLen*hex.EncodedLen(1))

func (fs *Datastore) encode(key datastore.Key) (dir, file string) {
51
	safe := hex.EncodeToString(key.Bytes()[1:])
Tommi Virtanen's avatar
Tommi Virtanen committed
52 53 54 55 56 57
	prefix := (safe + padding)[:fs.hexPrefixLen]
	dir = path.Join(fs.path, prefix)
	file = path.Join(dir, safe+extension)
	return dir, file
}

58 59 60 61 62 63 64 65 66 67 68 69
func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) {
	if path.Ext(file) != extension {
		return datastore.Key{}, false
	}
	name := file[:len(file)-len(extension)]
	k, err := hex.DecodeString(name)
	if err != nil {
		return datastore.Key{}, false
	}
	return datastore.NewKey(string(k)), true
}

70
func (fs *Datastore) makePrefixDir(dir string) error {
Jeromy's avatar
Jeromy committed
71 72
	if err := fs.makePrefixDirNoSync(dir); err != nil {
		return err
73 74 75 76 77 78
	}

	// In theory, if we create a new prefix dir and add a file to
	// it, the creation of the prefix dir itself might not be
	// durable yet. Sync the root dir after a successful mkdir of
	// a prefix dir, just to be paranoid.
79
	if err := syncDir(fs.path); err != nil {
80 81 82 83 84
		return err
	}
	return nil
}

Jeromy's avatar
Jeromy committed
85 86 87 88 89 90 91 92 93 94 95
func (fs *Datastore) makePrefixDirNoSync(dir string) error {
	if err := os.Mkdir(dir, 0777); err != nil {
		// EEXIST is safe to ignore here, that just means the prefix
		// directory already existed.
		if !os.IsExist(err) {
			return err
		}
	}
	return nil
}

Tommi Virtanen's avatar
Tommi Virtanen committed
96 97 98 99 100 101 102
func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
	val, ok := value.([]byte)
	if !ok {
		return datastore.ErrInvalidType
	}

	dir, path := fs.encode(key)
103 104
	if err := fs.makePrefixDir(dir); err != nil {
		return err
Tommi Virtanen's avatar
Tommi Virtanen committed
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
	}

	tmp, err := ioutil.TempFile(dir, "put-")
	if err != nil {
		return err
	}
	closed := false
	removed := false
	defer func() {
		if !closed {
			// silence errcheck
			_ = tmp.Close()
		}
		if !removed {
			// silence errcheck
			_ = os.Remove(tmp.Name())
		}
	}()

	if _, err := tmp.Write(val); err != nil {
		return err
	}
127 128 129
	if err := tmp.Sync(); err != nil {
		return err
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
130 131 132 133 134
	if err := tmp.Close(); err != nil {
		return err
	}
	closed = true

135
	err = osrename.Rename(tmp.Name(), path)
Tommi Virtanen's avatar
Tommi Virtanen committed
136 137 138 139 140
	if err != nil {
		return err
	}
	removed = true

141
	if err := syncDir(dir); err != nil {
142 143
		return err
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
144 145 146
	return nil
}

Jeromy's avatar
Jeromy committed
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
	var dirsToSync []string
	files := make(map[*os.File]string)

	for key, value := range data {
		val, ok := value.([]byte)
		if !ok {
			return datastore.ErrInvalidType
		}
		dir, path := fs.encode(key)
		if err := fs.makePrefixDirNoSync(dir); err != nil {
			return err
		}
		dirsToSync = append(dirsToSync, dir)

		tmp, err := ioutil.TempFile(dir, "put-")
		if err != nil {
			return err
		}

		if _, err := tmp.Write(val); err != nil {
			return err
		}

		files[tmp] = path
	}

	ops := make(map[*os.File]int)

	defer func() {
		for fi, _ := range files {
			val, _ := ops[fi]
			switch val {
			case 0:
				_ = fi.Close()
				fallthrough
			case 1:
				_ = os.Remove(fi.Name())
			}
		}
	}()

	// Now we sync everything
	// sync and close files
	for fi, _ := range files {
		if err := fi.Sync(); err != nil {
			return err
		}

		if err := fi.Close(); err != nil {
			return err
		}

		// signify closed
		ops[fi] = 1
	}

	// move files to their proper places
	for fi, path := range files {
		if err := osrename.Rename(fi.Name(), path); err != nil {
			return err
		}

		// signify removed
		ops[fi] = 2
	}

	// now sync the dirs for those files
	for _, dir := range dirsToSync {
		if err := syncDir(dir); err != nil {
			return err
		}
	}

	// sync top flatfs dir
	if err := syncDir(fs.path); err != nil {
		return err
	}

	return nil
}

Tommi Virtanen's avatar
Tommi Virtanen committed
229 230 231 232 233 234 235 236 237 238 239 240 241 242
func (fs *Datastore) Get(key datastore.Key) (value interface{}, err error) {
	_, path := fs.encode(key)
	data, err := ioutil.ReadFile(path)
	if err != nil {
		if os.IsNotExist(err) {
			return nil, datastore.ErrNotFound
		}
		// no specific error to return, so just pass it through
		return nil, err
	}
	return data, nil
}

func (fs *Datastore) Has(key datastore.Key) (exists bool, err error) {
Tommi Virtanen's avatar
Tommi Virtanen committed
243 244 245 246 247 248 249 250 251
	_, path := fs.encode(key)
	switch _, err := os.Stat(path); {
	case err == nil:
		return true, nil
	case os.IsNotExist(err):
		return false, nil
	default:
		return false, err
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
252 253 254
}

func (fs *Datastore) Delete(key datastore.Key) error {
Tommi Virtanen's avatar
Tommi Virtanen committed
255 256 257 258 259 260 261 262 263
	_, path := fs.encode(key)
	switch err := os.Remove(path); {
	case err == nil:
		return nil
	case os.IsNotExist(err):
		return datastore.ErrNotFound
	default:
		return err
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
264 265 266
}

func (fs *Datastore) Query(q query.Query) (query.Results, error) {
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
	if (q.Prefix != "" && q.Prefix != "/") ||
		len(q.Filters) > 0 ||
		len(q.Orders) > 0 ||
		q.Limit > 0 ||
		q.Offset > 0 ||
		!q.KeysOnly {
		// TODO this is overly simplistic, but the only caller is
		// `ipfs refs local` for now, and this gets us moving.
		return nil, errors.New("flatfs only supports listing all keys in random order")
	}

	// TODO this dumb implementation gathers all keys into a single slice.
	root, err := os.Open(fs.path)
	if err != nil {
		return nil, err
	}
	defer root.Close()

	var res []query.Entry
	prefixes, err := root.Readdir(0)
	if err != nil {
		return nil, err
	}
	for _, fi := range prefixes {
291 292
		var err error
		res, err = fs.enumerateKeys(fi, res)
293 294 295
		if err != nil {
			return nil, err
		}
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
	}
	return query.ResultsWithEntries(q, res), nil
}

func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.Entry, error) {
	if !fi.IsDir() || fi.Name()[0] == '.' {
		return res, nil
	}
	child, err := os.Open(path.Join(fs.path, fi.Name()))
	if err != nil {
		return nil, err
	}
	defer child.Close()
	objs, err := child.Readdir(0)
	if err != nil {
		return nil, err
	}
	for _, fi := range objs {
		if !fi.Mode().IsRegular() || fi.Name()[0] == '.' {
			return res, nil
316
		}
317 318 319
		key, ok := fs.decode(fi.Name())
		if !ok {
			return res, nil
320
		}
321
		res = append(res, query.Entry{Key: key.String()})
322
	}
323
	return res, nil
Tommi Virtanen's avatar
Tommi Virtanen committed
324 325
}

Jeromy's avatar
Jeromy committed
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
type flatfsTransaction struct {
	puts    map[datastore.Key]interface{}
	deletes map[datastore.Key]struct{}

	ds *Datastore
}

func (fs *Datastore) StartBatchOp() datastore.Transaction {
	return &flatfsTransaction{
		puts:    make(map[datastore.Key]interface{}),
		deletes: make(map[datastore.Key]struct{}),
		ds:      fs,
	}
}

func (bt *flatfsTransaction) Put(key datastore.Key, val interface{}) error {
	bt.puts[key] = val
	return nil
}

func (bt *flatfsTransaction) Delete(key datastore.Key) error {
	bt.deletes[key] = struct{}{}
	return nil
}

func (bt *flatfsTransaction) Commit() error {
	if err := bt.ds.putMany(bt.puts); err != nil {
		return err
	}

	for k, _ := range bt.deletes {
		if err := bt.ds.Delete(k); err != nil {
			return err
		}
	}

	return nil
}

Tommi Virtanen's avatar
Tommi Virtanen committed
365 366 367
var _ datastore.ThreadSafeDatastore = (*Datastore)(nil)

func (*Datastore) IsThreadSafe() {}