loader.go 6.38 KB
Newer Older
1 2 3 4 5
package loader

import (
	"fmt"
	"os"
6 7 8 9 10
	"strings"

	coredag "github.com/ipfs/go-ipfs/core/coredag"
	plugin "github.com/ipfs/go-ipfs/plugin"
	fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
11

Jakub Sztandera's avatar
Jakub Sztandera committed
12 13 14 15
	ipld "github.com/ipfs/go-ipld-format"
	logging "github.com/ipfs/go-log"
	coreiface "github.com/ipfs/interface-go-ipfs-core"
	opentracing "github.com/opentracing/opentracing-go"
16 17
)

18 19 20 21 22 23 24
var preloadPlugins []plugin.Plugin

// Preload adds one or more plugins to the preload list. This should _only_ be called during init.
func Preload(plugins ...plugin.Plugin) {
	preloadPlugins = append(preloadPlugins, plugins...)
}

25 26 27 28 29 30
var log = logging.Logger("plugin/loader")

var loadPluginsFunc = func(string) ([]plugin.Plugin, error) {
	return nil, nil
}

31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
type loaderState int

const (
	loaderLoading loaderState = iota
	loaderInitializing
	loaderInitialized
	loaderInjecting
	loaderInjected
	loaderStarting
	loaderStarted
	loaderClosing
	loaderClosed
	loaderFailed
)

func (ls loaderState) String() string {
	switch ls {
	case loaderLoading:
		return "Loading"
	case loaderInitializing:
		return "Initializing"
	case loaderInitialized:
		return "Initialized"
	case loaderInjecting:
		return "Injecting"
	case loaderInjected:
		return "Injected"
	case loaderStarting:
		return "Starting"
	case loaderStarted:
		return "Started"
	case loaderClosing:
		return "Closing"
	case loaderClosed:
		return "Closed"
	case loaderFailed:
		return "Failed"
	default:
		return "Unknown"
	}
}

73 74 75 76 77 78 79 80 81
// PluginLoader keeps track of loaded plugins.
//
// To use:
// 1. Load any desired plugins with Load and LoadDirectory. Preloaded plugins
//    will automatically be loaded.
// 2. Call Initialize to run all initialization logic.
// 3. Call Inject to register the plugins.
// 4. Optionally call Start to start plugins.
// 5. Call Close to close all plugins.
82
type PluginLoader struct {
83 84 85
	state   loaderState
	plugins map[string]plugin.Plugin
	started []plugin.Plugin
86 87 88
}

// NewPluginLoader creates new plugin loader
89 90
func NewPluginLoader() (*PluginLoader, error) {
	loader := &PluginLoader{plugins: make(map[string]plugin.Plugin, len(preloadPlugins))}
91
	for _, v := range preloadPlugins {
92
		if err := loader.Load(v); err != nil {
93 94
			return nil, err
		}
95 96 97
	}
	return loader, nil
}
98

99 100 101
func (loader *PluginLoader) assertState(state loaderState) error {
	if loader.state != state {
		return fmt.Errorf("loader state must be %s, was %s", state, loader.state)
102
	}
103 104
	return nil
}
105

106 107 108 109 110 111 112
func (loader *PluginLoader) transition(from, to loaderState) error {
	if err := loader.assertState(from); err != nil {
		return err
	}
	loader.state = to
	return nil
}
113

114
// Load loads a plugin into the plugin loader.
115 116 117
func (loader *PluginLoader) Load(pl plugin.Plugin) error {
	if err := loader.assertState(loaderLoading); err != nil {
		return err
118 119
	}

120 121 122 123 124 125 126 127 128 129 130 131
	name := pl.Name()
	if ppl, ok := loader.plugins[name]; ok {
		// plugin is already loaded
		return fmt.Errorf(
			"plugin: %s, is duplicated in version: %s, "+
				"while trying to load dynamically: %s",
			name, ppl.Version(), pl.Version())
	}
	loader.plugins[name] = pl
	return nil
}

132
// LoadDirectory loads a directory of plugins into the plugin loader.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
func (loader *PluginLoader) LoadDirectory(pluginDir string) error {
	if err := loader.assertState(loaderLoading); err != nil {
		return err
	}
	newPls, err := loadDynamicPlugins(pluginDir)
	if err != nil {
		return err
	}

	for _, pl := range newPls {
		if err := loader.Load(pl); err != nil {
			return err
		}
	}
	return nil
148 149 150 151 152 153 154 155 156 157 158 159 160 161
}

func loadDynamicPlugins(pluginDir string) ([]plugin.Plugin, error) {
	_, err := os.Stat(pluginDir)
	if os.IsNotExist(err) {
		return nil, nil
	}
	if err != nil {
		return nil, err
	}

	return loadPluginsFunc(pluginDir)
}

162
// Initialize initializes all loaded plugins
163
func (loader *PluginLoader) Initialize() error {
164 165 166
	if err := loader.transition(loaderLoading, loaderInitializing); err != nil {
		return err
	}
167 168 169
	for _, p := range loader.plugins {
		err := p.Init()
		if err != nil {
170
			loader.state = loaderFailed
171 172 173 174
			return err
		}
	}

175
	return loader.transition(loaderInitializing, loaderInitialized)
176 177
}

178 179
// Inject hooks all the plugins into the appropriate subsystems.
func (loader *PluginLoader) Inject() error {
180 181 182 183
	if err := loader.transition(loaderInitialized, loaderInjecting); err != nil {
		return err
	}

184
	for _, pl := range loader.plugins {
185 186
		if pl, ok := pl.(plugin.PluginIPLD); ok {
			err := injectIPLDPlugin(pl)
187
			if err != nil {
188
				loader.state = loaderFailed
189 190
				return err
			}
191 192 193
		}
		if pl, ok := pl.(plugin.PluginTracer); ok {
			err := injectTracerPlugin(pl)
194
			if err != nil {
195
				loader.state = loaderFailed
196 197
				return err
			}
198 199 200
		}
		if pl, ok := pl.(plugin.PluginDatastore); ok {
			err := injectDatastorePlugin(pl)
201
			if err != nil {
202
				loader.state = loaderFailed
203 204 205 206
				return err
			}
		}
	}
207 208

	return loader.transition(loaderInjecting, loaderInjected)
209 210
}

211 212
// Start starts all long-running plugins.
func (loader *PluginLoader) Start(iface coreiface.CoreAPI) error {
213 214 215 216
	if err := loader.transition(loaderInjected, loaderStarting); err != nil {
		return err
	}
	for _, pl := range loader.plugins {
217 218 219
		if pl, ok := pl.(plugin.PluginDaemon); ok {
			err := pl.Start(iface)
			if err != nil {
220
				_ = loader.Close()
221 222
				return err
			}
223
			loader.started = append(loader.started, pl)
224 225
		}
	}
226 227

	return loader.transition(loaderStarting, loaderStarted)
228 229 230 231
}

// StopDaemon stops all long-running plugins.
func (loader *PluginLoader) Close() error {
232 233 234 235 236 237
	switch loader.state {
	case loaderClosing, loaderFailed, loaderClosed:
		// nothing to do.
		return nil
	}
	loader.state = loaderClosing
238 239

	var errs []string
240 241 242
	started := loader.started
	loader.started = nil
	for _, pl := range started {
243 244 245 246 247 248 249 250 251 252 253 254
		if pl, ok := pl.(plugin.PluginDaemon); ok {
			err := pl.Close()
			if err != nil {
				errs = append(errs, fmt.Sprintf(
					"error closing plugin %s: %s",
					pl.Name(),
					err.Error(),
				))
			}
		}
	}
	if errs != nil {
255
		loader.state = loaderFailed
256 257
		return fmt.Errorf(strings.Join(errs, "\n"))
	}
258
	loader.state = loaderClosed
259 260 261
	return nil
}

262 263 264 265 266
func injectDatastorePlugin(pl plugin.PluginDatastore) error {
	return fsrepo.AddDatastoreConfigHandler(pl.DatastoreTypeName(), pl.DatastoreConfigParser())
}

func injectIPLDPlugin(pl plugin.PluginIPLD) error {
267 268 269 270 271 272 273
	err := pl.RegisterBlockDecoders(ipld.DefaultBlockDecoder)
	if err != nil {
		return err
	}
	return pl.RegisterInputEncParsers(coredag.DefaultInputEncParsers)
}

274
func injectTracerPlugin(pl plugin.PluginTracer) error {
275 276 277 278 279 280 281
	tracer, err := pl.InitTracer()
	if err != nil {
		return err
	}
	opentracing.SetGlobalTracer(tracer)
	return nil
}