loader.go 8.18 KB
Newer Older
1 2 3 4
package loader

import (
	"fmt"
5
	"io"
6
	"os"
7
	"path/filepath"
8
	"runtime"
9 10
	"strings"

tavit ohanian's avatar
tavit ohanian committed
11 12
	config "gitlab.dms3.io/dms3/go-dms3-config"
	cserialize "gitlab.dms3.io/dms3/go-dms3-config/serialize"
13

tavit ohanian's avatar
tavit ohanian committed
14 15 16 17 18
	"gitlab.dms3.io/dms3/go-dms3/core"
	"gitlab.dms3.io/dms3/go-dms3/core/coreapi"
	coredag "gitlab.dms3.io/dms3/go-dms3/core/coredag"
	plugin "gitlab.dms3.io/dms3/go-dms3/plugin"
	fsrepo "gitlab.dms3.io/dms3/go-dms3/repo/fsrepo"
19

Jakub Sztandera's avatar
Jakub Sztandera committed
20
	opentracing "github.com/opentracing/opentracing-go"
tavit ohanian's avatar
tavit ohanian committed
21 22
	ld "gitlab.dms3.io/dms3/go-ld-format"
	logging "gitlab.dms3.io/dms3/go-log"
23 24
)

25 26 27 28 29 30 31
var preloadPlugins []plugin.Plugin

// Preload adds one or more plugins to the preload list. This should _only_ be called during init.
func Preload(plugins ...plugin.Plugin) {
	preloadPlugins = append(preloadPlugins, plugins...)
}

32 33
var log = logging.Logger("plugin/loader")

34 35
var loadPluginFunc = func(string) ([]plugin.Plugin, error) {
	return nil, fmt.Errorf("unsupported platform %s", runtime.GOOS)
36 37
}

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
type loaderState int

const (
	loaderLoading loaderState = iota
	loaderInitializing
	loaderInitialized
	loaderInjecting
	loaderInjected
	loaderStarting
	loaderStarted
	loaderClosing
	loaderClosed
	loaderFailed
)

func (ls loaderState) String() string {
	switch ls {
	case loaderLoading:
		return "Loading"
	case loaderInitializing:
		return "Initializing"
	case loaderInitialized:
		return "Initialized"
	case loaderInjecting:
		return "Injecting"
	case loaderInjected:
		return "Injected"
	case loaderStarting:
		return "Starting"
	case loaderStarted:
		return "Started"
	case loaderClosing:
		return "Closing"
	case loaderClosed:
		return "Closed"
	case loaderFailed:
		return "Failed"
	default:
		return "Unknown"
	}
}

80 81 82 83 84 85 86 87 88
// PluginLoader keeps track of loaded plugins.
//
// To use:
// 1. Load any desired plugins with Load and LoadDirectory. Preloaded plugins
//    will automatically be loaded.
// 2. Call Initialize to run all initialization logic.
// 3. Call Inject to register the plugins.
// 4. Optionally call Start to start plugins.
// 5. Call Close to close all plugins.
89
type PluginLoader struct {
90 91 92
	state   loaderState
	plugins map[string]plugin.Plugin
	started []plugin.Plugin
93 94
	config  config.Plugins
	repo    string
95 96 97
}

// NewPluginLoader creates new plugin loader
98 99 100 101 102 103 104 105 106 107 108 109
func NewPluginLoader(repo string) (*PluginLoader, error) {
	loader := &PluginLoader{plugins: make(map[string]plugin.Plugin, len(preloadPlugins)), repo: repo}
	if repo != "" {
		cfg, err := cserialize.Load(filepath.Join(repo, config.DefaultConfigFile))
		switch err {
		case cserialize.ErrNotInitialized:
		case nil:
			loader.config = cfg.Plugins
		default:
			return nil, err
		}
	}
110
	for _, v := range preloadPlugins {
111
		if err := loader.Load(v); err != nil {
112 113
			return nil, err
		}
114
	}
115 116 117 118

	if err := loader.LoadDirectory(filepath.Join(repo, "plugins")); err != nil {
		return nil, err
	}
119 120
	return loader, nil
}
121

122 123 124
func (loader *PluginLoader) assertState(state loaderState) error {
	if loader.state != state {
		return fmt.Errorf("loader state must be %s, was %s", state, loader.state)
125
	}
126 127
	return nil
}
128

129 130 131 132 133 134 135
func (loader *PluginLoader) transition(from, to loaderState) error {
	if err := loader.assertState(from); err != nil {
		return err
	}
	loader.state = to
	return nil
}
136

137
// Load loads a plugin into the plugin loader.
138 139 140
func (loader *PluginLoader) Load(pl plugin.Plugin) error {
	if err := loader.assertState(loaderLoading); err != nil {
		return err
141 142
	}

143 144 145 146 147 148 149 150
	name := pl.Name()
	if ppl, ok := loader.plugins[name]; ok {
		// plugin is already loaded
		return fmt.Errorf(
			"plugin: %s, is duplicated in version: %s, "+
				"while trying to load dynamically: %s",
			name, ppl.Version(), pl.Version())
	}
151 152 153 154
	if loader.config.Plugins[name].Disabled {
		log.Infof("not loading disabled plugin %s", name)
		return nil
	}
155 156 157 158
	loader.plugins[name] = pl
	return nil
}

159
// LoadDirectory loads a directory of plugins into the plugin loader.
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
func (loader *PluginLoader) LoadDirectory(pluginDir string) error {
	if err := loader.assertState(loaderLoading); err != nil {
		return err
	}
	newPls, err := loadDynamicPlugins(pluginDir)
	if err != nil {
		return err
	}

	for _, pl := range newPls {
		if err := loader.Load(pl); err != nil {
			return err
		}
	}
	return nil
175 176 177 178 179 180 181 182 183 184 185
}

func loadDynamicPlugins(pluginDir string) ([]plugin.Plugin, error) {
	_, err := os.Stat(pluginDir)
	if os.IsNotExist(err) {
		return nil, nil
	}
	if err != nil {
		return nil, err
	}

186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
	var plugins []plugin.Plugin

	err = filepath.Walk(pluginDir, func(fi string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}
		if info.IsDir() {
			if fi != pluginDir {
				log.Warnf("found directory inside plugins directory: %s", fi)
			}
			return nil
		}

		if info.Mode().Perm()&0111 == 0 {
			// file is not executable let's not load it
			// this is to prevent loading plugins from for example non-executable
			// mounts, some /tmp mounts are marked as such for security
			log.Errorf("non-executable file in plugins directory: %s", fi)
			return nil
		}

		if newPlugins, err := loadPluginFunc(fi); err == nil {
			plugins = append(plugins, newPlugins...)
		} else {
			return fmt.Errorf("loading plugin %s: %s", fi, err)
		}
		return nil
	})

	return plugins, err
216 217
}

218
// Initialize initializes all loaded plugins
219
func (loader *PluginLoader) Initialize() error {
220 221 222
	if err := loader.transition(loaderLoading, loaderInitializing); err != nil {
		return err
	}
223 224 225 226 227
	for name, p := range loader.plugins {
		err := p.Init(&plugin.Environment{
			Repo:   loader.repo,
			Config: loader.config.Plugins[name].Config,
		})
228
		if err != nil {
229
			loader.state = loaderFailed
230 231 232 233
			return err
		}
	}

234
	return loader.transition(loaderInitializing, loaderInitialized)
235 236
}

237 238
// Inject hooks all the plugins into the appropriate subsystems.
func (loader *PluginLoader) Inject() error {
239 240 241 242
	if err := loader.transition(loaderInitialized, loaderInjecting); err != nil {
		return err
	}

243
	for _, pl := range loader.plugins {
tavit ohanian's avatar
tavit ohanian committed
244 245
		if pl, ok := pl.(plugin.PluginLD); ok {
			err := injectLDPlugin(pl)
246
			if err != nil {
247
				loader.state = loaderFailed
248 249
				return err
			}
250 251 252
		}
		if pl, ok := pl.(plugin.PluginTracer); ok {
			err := injectTracerPlugin(pl)
253
			if err != nil {
254
				loader.state = loaderFailed
255 256
				return err
			}
257 258 259
		}
		if pl, ok := pl.(plugin.PluginDatastore); ok {
			err := injectDatastorePlugin(pl)
260
			if err != nil {
261
				loader.state = loaderFailed
262 263 264 265
				return err
			}
		}
	}
266 267

	return loader.transition(loaderInjecting, loaderInjected)
268 269
}

270
// Start starts all long-running plugins.
tavit ohanian's avatar
tavit ohanian committed
271
func (loader *PluginLoader) Start(node *core.Dms3Node) error {
272 273 274
	if err := loader.transition(loaderInjected, loaderStarting); err != nil {
		return err
	}
275 276 277 278
	iface, err := coreapi.NewCoreAPI(node)
	if err != nil {
		return err
	}
279
	for _, pl := range loader.plugins {
280 281 282
		if pl, ok := pl.(plugin.PluginDaemon); ok {
			err := pl.Start(iface)
			if err != nil {
283
				_ = loader.Close()
284 285
				return err
			}
286
			loader.started = append(loader.started, pl)
287
		}
288 289 290 291 292 293 294 295
		if pl, ok := pl.(plugin.PluginDaemonInternal); ok {
			err := pl.Start(node)
			if err != nil {
				_ = loader.Close()
				return err
			}
			loader.started = append(loader.started, pl)
		}
296
	}
297 298

	return loader.transition(loaderStarting, loaderStarted)
299 300 301 302
}

// StopDaemon stops all long-running plugins.
func (loader *PluginLoader) Close() error {
303 304 305 306 307 308
	switch loader.state {
	case loaderClosing, loaderFailed, loaderClosed:
		// nothing to do.
		return nil
	}
	loader.state = loaderClosing
309 310

	var errs []string
311 312 313
	started := loader.started
	loader.started = nil
	for _, pl := range started {
314 315
		if closer, ok := pl.(io.Closer); ok {
			err := closer.Close()
316 317 318 319 320 321 322 323 324 325
			if err != nil {
				errs = append(errs, fmt.Sprintf(
					"error closing plugin %s: %s",
					pl.Name(),
					err.Error(),
				))
			}
		}
	}
	if errs != nil {
326
		loader.state = loaderFailed
327 328
		return fmt.Errorf(strings.Join(errs, "\n"))
	}
329
	loader.state = loaderClosed
330 331 332
	return nil
}

333 334 335 336
func injectDatastorePlugin(pl plugin.PluginDatastore) error {
	return fsrepo.AddDatastoreConfigHandler(pl.DatastoreTypeName(), pl.DatastoreConfigParser())
}

tavit ohanian's avatar
tavit ohanian committed
337 338
func injectLDPlugin(pl plugin.PluginLD) error {
	err := pl.RegisterBlockDecoders(ld.DefaultBlockDecoder)
339 340 341 342 343 344
	if err != nil {
		return err
	}
	return pl.RegisterInputEncParsers(coredag.DefaultInputEncParsers)
}

345
func injectTracerPlugin(pl plugin.PluginTracer) error {
346 347 348 349 350 351 352
	tracer, err := pl.InitTracer()
	if err != nil {
		return err
	}
	opentracing.SetGlobalTracer(tracer)
	return nil
}