datastores.go 9.67 KB
Newer Older
1 2 3
package fsrepo

import (
Kevin Atkinson's avatar
Kevin Atkinson committed
4
	"bytes"
5
	"encoding/json"
6
	"fmt"
7
	"os"
8
	"path/filepath"
Kevin Atkinson's avatar
Kevin Atkinson committed
9
	"sort"
10 11 12 13 14

	repo "github.com/ipfs/go-ipfs/repo"

	measure "gx/ipfs/QmSb95iHExSSb47zpmyn5CyY5PZidVWSjyKyDqgYQrnKor/go-ds-measure"
	flatfs "gx/ipfs/QmUTshC2PP4ZDqkrFfDU4JGJFMWjYnunxPgkQ6ZCA2hGqh/go-ds-flatfs"
Łukasz Magiera's avatar
Łukasz Magiera committed
15

16 17
	ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
	mount "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/syncmount"
Łukasz Magiera's avatar
Łukasz Magiera committed
18 19

	levelds "gx/ipfs/QmPdvXuXWAR6gtxxqZw42RtSADMwz4ijVmYHGS542b6cMz/go-ds-leveldb"
20
	badgerds "gx/ipfs/QmUamAGkvPp1w84dfc2YMy9ic6iyBvaRoaTiaat8Crtawq/go-ds-badger"
21 22 23
	ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt"
)

24 25
// ConfigFromMap creates a new datastore config from a map
type ConfigFromMap func(map[string]interface{}) (DatastoreConfig, error)
26

Kevin Atkinson's avatar
Kevin Atkinson committed
27 28 29
// DatastoreConfig is an abstraction of a datastore config.  A "spec"
// is first converted to a DatastoreConfig and then Create() is called
// to instantiate a new datastore
30
type DatastoreConfig interface {
31 32 33 34
	// DiskSpec returns a minimal configuration of the datastore
	// represting what is stored on disk.  Run time values are
	// excluded.
	DiskSpec() DiskSpec
35

36 37 38
	// Create instantiate a new datastore from this config
	Create(path string) (repo.Datastore, error)
}
39

Kevin Atkinson's avatar
Kevin Atkinson committed
40 41 42 43
// DiskSpec is the type returned by the DatastoreConfig's DiskSpec method
type DiskSpec map[string]interface{}

// Bytes returns a minimal JSON encoding of the DiskSpec
44
func (spec DiskSpec) Bytes() []byte {
45 46 47 48 49
	b, err := json.Marshal(spec)
	if err != nil {
		// should not happen
		panic(err)
	}
50 51 52
	return bytes.TrimSpace(b)
}

Kevin Atkinson's avatar
Kevin Atkinson committed
53
// String returns a minimal JSON encoding of the DiskSpec
54 55
func (spec DiskSpec) String() string {
	return string(spec.Bytes())
56 57
}

58
var datastores map[string]ConfigFromMap
59

60 61
func init() {
	datastores = map[string]ConfigFromMap{
62 63 64 65 66 67 68
		"mount":    MountDatastoreConfig,
		"flatfs":   FlatfsDatastoreConfig,
		"levelds":  LeveldsDatastoreConfig,
		"badgerds": BadgerdsDatastoreConfig,
		"mem":      MemDatastoreConfig,
		"log":      LogDatastoreConfig,
		"measure":  MeasureDatastoreConfig,
69 70 71
	}
}

Kevin Atkinson's avatar
Kevin Atkinson committed
72 73
// AnyDatastoreConfig returns a DatastoreConfig from a spec based on
// the "type" parameter
74 75 76 77 78 79 80 81
func AnyDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
	which, ok := params["type"].(string)
	if !ok {
		return nil, fmt.Errorf("'type' field missing or not a string")
	}
	fun, ok := datastores[which]
	if !ok {
		return nil, fmt.Errorf("unknown datastore type: %s", which)
82
	}
83
	return fun(params)
84 85
}

86 87 88 89 90 91 92 93 94
type mountDatastoreConfig struct {
	mounts []premount
}

type premount struct {
	ds     DatastoreConfig
	prefix ds.Key
}

Kevin Atkinson's avatar
Kevin Atkinson committed
95
// MountDatastoreConfig returns a mount DatastoreConfig from a spec
96 97 98 99 100 101 102 103 104 105 106
func MountDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
	var res mountDatastoreConfig
	mounts, ok := params["mounts"].([]interface{})
	if !ok {
		return nil, fmt.Errorf("'mounts' field is missing or not an array")
	}
	for _, iface := range mounts {
		cfg, ok := iface.(map[string]interface{})
		if !ok {
			return nil, fmt.Errorf("expected map for mountpoint")
		}
107

108
		child, err := AnyDatastoreConfig(cfg)
109 110 111 112 113 114 115 116 117
		if err != nil {
			return nil, err
		}

		prefix, found := cfg["mountpoint"]
		if !found {
			return nil, fmt.Errorf("no 'mountpoint' on mount")
		}

118 119 120
		res.mounts = append(res.mounts, premount{
			ds:     child,
			prefix: ds.NewKey(prefix.(string)),
121 122
		})
	}
Kevin Atkinson's avatar
Kevin Atkinson committed
123 124 125 126
	sort.Slice(res.mounts,
		func(i, j int) bool {
			return res.mounts[i].prefix.String() > res.mounts[j].prefix.String()
		})
127

128 129 130
	return &res, nil
}

131 132 133 134 135 136 137 138
func (c *mountDatastoreConfig) DiskSpec() DiskSpec {
	cfg := map[string]interface{}{"type": "mount"}
	mounts := make([]interface{}, len(c.mounts))
	for i, m := range c.mounts {
		c := m.ds.DiskSpec()
		if c == nil {
			c = make(map[string]interface{})
		}
Kevin Atkinson's avatar
Kevin Atkinson committed
139
		c["mountpoint"] = m.prefix.String()
140
		mounts[i] = c
Kevin Atkinson's avatar
Kevin Atkinson committed
141
	}
142 143
	cfg["mounts"] = mounts
	return cfg
Kevin Atkinson's avatar
Kevin Atkinson committed
144 145
}

146 147 148 149 150 151 152 153 154 155
func (c *mountDatastoreConfig) Create(path string) (repo.Datastore, error) {
	mounts := make([]mount.Mount, len(c.mounts))
	for i, m := range c.mounts {
		ds, err := m.ds.Create(path)
		if err != nil {
			return nil, err
		}
		mounts[i].Datastore = ds
		mounts[i].Prefix = m.prefix
	}
156 157 158
	return mount.New(mounts), nil
}

159 160 161 162 163 164
type flatfsDatastoreConfig struct {
	path      string
	shardFun  *flatfs.ShardIdV1
	syncField bool
}

Kevin Atkinson's avatar
Kevin Atkinson committed
165
// FlatfsDatastoreConfig returns a flatfs DatastoreConfig from a spec
166 167 168 169 170 171
func FlatfsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
	var c flatfsDatastoreConfig
	var ok bool
	var err error

	c.path, ok = params["path"].(string)
Kevin Atkinson's avatar
Kevin Atkinson committed
172 173 174
	if !ok {
		return nil, fmt.Errorf("'path' field is missing or not boolean")
	}
175

Kevin Atkinson's avatar
Kevin Atkinson committed
176 177 178 179
	sshardFun, ok := params["shardFunc"].(string)
	if !ok {
		return nil, fmt.Errorf("'shardFunc' field is missing or not a string")
	}
180
	c.shardFun, err = flatfs.ParseShardFunc(sshardFun)
181 182 183 184
	if err != nil {
		return nil, err
	}

185
	c.syncField, ok = params["sync"].(bool)
Kevin Atkinson's avatar
Kevin Atkinson committed
186 187 188
	if !ok {
		return nil, fmt.Errorf("'sync' field is missing or not boolean")
	}
189 190 191
	return &c, nil
}

192 193 194 195 196 197
func (c *flatfsDatastoreConfig) DiskSpec() DiskSpec {
	return map[string]interface{}{
		"type":      "flatfs",
		"path":      c.path,
		"shardFunc": c.shardFun.String(),
	}
Kevin Atkinson's avatar
Kevin Atkinson committed
198 199
}

200 201 202 203 204 205 206
func (c *flatfsDatastoreConfig) Create(path string) (repo.Datastore, error) {
	p := c.path
	if !filepath.IsAbs(p) {
		p = filepath.Join(path, p)
	}

	return flatfs.CreateOrOpen(p, c.shardFun, c.syncField)
207 208
}

209 210 211 212 213
type leveldsDatastoreConfig struct {
	path        string
	compression ldbopts.Compression
}

Kevin Atkinson's avatar
Kevin Atkinson committed
214
// LeveldsDatastoreConfig returns a levelds DatastoreConfig from a spec
215 216 217 218 219
func LeveldsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
	var c leveldsDatastoreConfig
	var ok bool

	c.path, ok = params["path"].(string)
Kevin Atkinson's avatar
Kevin Atkinson committed
220 221 222
	if !ok {
		return nil, fmt.Errorf("'path' field is missing or not string")
	}
223

224
	switch cm := params["compression"].(string); cm {
225
	case "none":
226
		c.compression = ldbopts.NoCompression
227
	case "snappy":
228
		c.compression = ldbopts.SnappyCompression
229
	case "":
230
		c.compression = ldbopts.DefaultCompression
231 232
	default:
		return nil, fmt.Errorf("unrecognized value for compression: %s", cm)
233 234 235 236 237
	}

	return &c, nil
}

238 239 240 241 242
func (c *leveldsDatastoreConfig) DiskSpec() DiskSpec {
	return map[string]interface{}{
		"type": "levelds",
		"path": c.path,
	}
Kevin Atkinson's avatar
Kevin Atkinson committed
243 244
}

245 246 247 248
func (c *leveldsDatastoreConfig) Create(path string) (repo.Datastore, error) {
	p := c.path
	if !filepath.IsAbs(p) {
		p = filepath.Join(path, p)
249
	}
250

251
	return levelds.NewDatastore(p, &levelds.Options{
252
		Compression: c.compression,
253 254 255
	})
}

256 257 258 259
type memDatastoreConfig struct {
	cfg map[string]interface{}
}

Kevin Atkinson's avatar
Kevin Atkinson committed
260
// MemDatastoreConfig returns a memory DatastoreConfig from a spec
261 262 263 264
func MemDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
	return &memDatastoreConfig{params}, nil
}

265 266
func (c *memDatastoreConfig) DiskSpec() DiskSpec {
	return nil
Kevin Atkinson's avatar
Kevin Atkinson committed
267 268
}

269 270 271 272 273 274 275 276 277
func (c *memDatastoreConfig) Create(string) (repo.Datastore, error) {
	return ds.NewMapDatastore(), nil
}

type logDatastoreConfig struct {
	child DatastoreConfig
	name  string
}

Kevin Atkinson's avatar
Kevin Atkinson committed
278
// LogDatastoreConfig returns a log DatastoreConfig from a spec
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
func LogDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
	childField, ok := params["child"].(map[string]interface{})
	if !ok {
		return nil, fmt.Errorf("'child' field is missing or not a map")
	}
	child, err := AnyDatastoreConfig(childField)
	if err != nil {
		return nil, err
	}
	name, ok := params["name"].(string)
	if !ok {
		return nil, fmt.Errorf("'name' field was missing or not a string")
	}
	return &logDatastoreConfig{child, name}, nil

}

func (c *logDatastoreConfig) Create(path string) (repo.Datastore, error) {
	child, err := c.child.Create(path)
	if err != nil {
		return nil, err
	}
	return ds.NewLogDatastore(child, c.name), nil
}

304 305
func (c *logDatastoreConfig) DiskSpec() DiskSpec {
	return c.child.DiskSpec()
Kevin Atkinson's avatar
Kevin Atkinson committed
306 307
}

308 309 310 311 312
type measureDatastoreConfig struct {
	child  DatastoreConfig
	prefix string
}

Kevin Atkinson's avatar
Kevin Atkinson committed
313
// MeasureDatastoreConfig returns a measure DatastoreConfig from a spec
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
func MeasureDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
	childField, ok := params["child"].(map[string]interface{})
	if !ok {
		return nil, fmt.Errorf("'child' field is missing or not a map")
	}
	child, err := AnyDatastoreConfig(childField)
	if err != nil {
		return nil, err
	}
	prefix, ok := params["prefix"].(string)
	if !ok {
		return nil, fmt.Errorf("'prefix' field was missing or not a string")
	}
	return &measureDatastoreConfig{child, prefix}, nil
}

330 331
func (c *measureDatastoreConfig) DiskSpec() DiskSpec {
	return c.child.DiskSpec()
Kevin Atkinson's avatar
Kevin Atkinson committed
332 333
}

334 335 336 337 338 339
func (c measureDatastoreConfig) Create(path string) (repo.Datastore, error) {
	child, err := c.child.Create(path)
	if err != nil {
		return nil, err
	}
	return measure.New(c.prefix, child), nil
340
}
Łukasz Magiera's avatar
Łukasz Magiera committed
341

342
type badgerdsDatastoreConfig struct {
343 344
	path       string
	syncWrites bool
345 346
}

Jeromy's avatar
Jeromy committed
347 348
// BadgerdsDatastoreConfig returns a configuration stub for a badger datastore
// from the given parameters
349 350 351 352 353
func BadgerdsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
	var c badgerdsDatastoreConfig
	var ok bool

	c.path, ok = params["path"].(string)
Łukasz Magiera's avatar
Łukasz Magiera committed
354 355 356
	if !ok {
		return nil, fmt.Errorf("'path' field is missing or not string")
	}
357

358 359 360 361 362 363 364 365 366 367 368
	sw, ok := params["syncWrites"]
	if !ok {
		c.syncWrites = true
	} else {
		if swb, ok := sw.(bool); ok {
			c.syncWrites = swb
		} else {
			return nil, fmt.Errorf("'syncWrites' field was not a boolean")
		}
	}

369 370 371
	return &c, nil
}

Łukasz Magiera's avatar
Łukasz Magiera committed
372 373 374 375 376
func (c *badgerdsDatastoreConfig) DiskSpec() DiskSpec {
	return map[string]interface{}{
		"type": "badgerds",
		"path": c.path,
	}
377 378 379 380
}

func (c *badgerdsDatastoreConfig) Create(path string) (repo.Datastore, error) {
	p := c.path
Łukasz Magiera's avatar
Łukasz Magiera committed
381
	if !filepath.IsAbs(p) {
382
		p = filepath.Join(path, p)
Łukasz Magiera's avatar
Łukasz Magiera committed
383 384 385 386 387 388 389
	}

	err := os.MkdirAll(p, 0755)
	if err != nil {
		return nil, err
	}

Steven Allen's avatar
Steven Allen committed
390
	defopts := badgerds.DefaultOptions
391 392 393
	defopts.SyncWrites = c.syncWrites

	return badgerds.NewDatastore(p, &defopts)
Łukasz Magiera's avatar
Łukasz Magiera committed
394
}