groups.go 8.68 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1 2 3
package node

import (
4
	"context"
5 6 7 8
	"errors"
	"fmt"
	"time"

tavit ohanian's avatar
tavit ohanian committed
9 10 11 12 13 14 15 16 17 18 19 20 21 22
	blockstore "gitlab.dms3.io/dms3/go-dms3-blockstore"
	config "gitlab.dms3.io/dms3/go-dms3-config"
	util "gitlab.dms3.io/dms3/go-dms3-util"
	"gitlab.dms3.io/dms3/go-dms3/core/node/dms3p2p"
	log "gitlab.dms3.io/dms3/go-log"
	peer "gitlab.dms3.io/p2p/go-p2p-core/peer"
	pubsub "gitlab.dms3.io/p2p/go-p2p-pubsub"

	"gitlab.dms3.io/p2p/go-p2p"

	offline "gitlab.dms3.io/dms3/go-dms3-exchange-offline"
	offroute "gitlab.dms3.io/dms3/go-dms3-routing/offline"
	"gitlab.dms3.io/dms3/go-path/resolver"
	uio "gitlab.dms3.io/dms3/go-unixfs/io"
Łukasz Magiera's avatar
Łukasz Magiera committed
23 24 25
	"go.uber.org/fx"
)

26 27
var logger = log.Logger("core:constructor")

tavit ohanian's avatar
tavit ohanian committed
28 29 30 31 32
var BaseP2p = fx.Options(
	fx.Provide(dms3p2p.UserAgent),
	fx.Provide(dms3p2p.PNet),
	fx.Provide(dms3p2p.ConnectionManager),
	fx.Provide(dms3p2p.Host),
Łukasz Magiera's avatar
Łukasz Magiera committed
33

tavit ohanian's avatar
tavit ohanian committed
34
	fx.Provide(dms3p2p.DiscoveryHandler),
Łukasz Magiera's avatar
Łukasz Magiera committed
35

tavit ohanian's avatar
tavit ohanian committed
36
	fx.Invoke(dms3p2p.PNetChecker),
Łukasz Magiera's avatar
Łukasz Magiera committed
37 38
)

tavit ohanian's avatar
tavit ohanian committed
39
func P2p(bcfg *BuildCfg, cfg *config.Config) fx.Option {
40 41 42
	// parse ConnMgr config

	grace := config.DefaultConnMgrGracePeriod
43
	low := config.DefaultConnMgrLowWater
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
	high := config.DefaultConnMgrHighWater

	connmgr := fx.Options()

	if cfg.Swarm.ConnMgr.Type != "none" {
		switch cfg.Swarm.ConnMgr.Type {
		case "":
			// 'default' value is the basic connection manager
			break
		case "basic":
			var err error
			grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod)
			if err != nil {
				return fx.Error(fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err))
			}

			low = cfg.Swarm.ConnMgr.LowWater
			high = cfg.Swarm.ConnMgr.HighWater
		default:
			return fx.Error(fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type))
		}

tavit ohanian's avatar
tavit ohanian committed
66
		connmgr = fx.Provide(dms3p2p.ConnectionManager(low, high, grace))
67 68 69 70
	}

	// parse PubSub config

71
	ps, disc := fx.Options(), fx.Options()
tavit ohanian's avatar
tavit ohanian committed
72 73
	if bcfg.getOpt("pubsub") || bcfg.getOpt("dms3nsps") {
		disc = fx.Provide(dms3p2p.TopicDiscovery())
74

75
		var pubsubOptions []pubsub.Option
76 77 78 79
		pubsubOptions = append(
			pubsubOptions,
			pubsub.WithMessageSigning(!cfg.Pubsub.DisableSigning),
		)
80 81 82 83 84

		switch cfg.Pubsub.Router {
		case "":
			fallthrough
		case "gossipsub":
tavit ohanian's avatar
tavit ohanian committed
85
			ps = fx.Provide(dms3p2p.GossipSub(pubsubOptions...))
86
		case "floodsub":
tavit ohanian's avatar
tavit ohanian committed
87
			ps = fx.Provide(dms3p2p.FloodSub(pubsubOptions...))
88 89 90 91 92
		default:
			return fx.Error(fmt.Errorf("unknown pubsub router %s", cfg.Pubsub.Router))
		}
	}

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
	autonat := fx.Options()

	switch cfg.AutoNAT.ServiceMode {
	default:
		panic("BUG: unhandled autonat service mode")
	case config.AutoNATServiceDisabled:
	case config.AutoNATServiceUnset:
		// TODO
		//
		// We're enabling the AutoNAT service by default on _all_ nodes
		// for the moment.
		//
		// We should consider disabling it by default if the dht is set
		// to dhtclient.
		fallthrough
	case config.AutoNATServiceEnabled:
tavit ohanian's avatar
tavit ohanian committed
109
		autonat = fx.Provide(dms3p2p.AutoNATService(cfg.AutoNAT.Throttle))
110 111
	}

112 113 114 115 116 117 118 119 120 121 122 123 124
	// If `cfg.Swarm.DisableRelay` is set and `Network.Relay` isn't, use the former.
	enableRelay := cfg.Swarm.Transports.Network.Relay.WithDefault(!cfg.Swarm.DisableRelay) //nolint

	// Warn about a deprecated option.
	//nolint
	if cfg.Swarm.DisableRelay {
		logger.Error("The `Swarm.DisableRelay' config field is deprecated.")
		if enableRelay {
			logger.Error("`Swarm.DisableRelay' has been overridden by `Swarm.Transports.Network.Relay'")
		} else {
			logger.Error("Use the `Swarm.Transports.Network.Relay' config field instead")
		}
	}
125

126
	// Gather all the options
127
	opts := fx.Options(
tavit ohanian's avatar
tavit ohanian committed
128
		BaseP2p,
129

tavit ohanian's avatar
tavit ohanian committed
130 131 132 133 134 135 136
		fx.Provide(dms3p2p.AddrFilters(cfg.Swarm.AddrFilters)),
		fx.Provide(dms3p2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.NoAnnounce)),
		fx.Provide(dms3p2p.SmuxTransport(cfg.Swarm.Transports)),
		fx.Provide(dms3p2p.Relay(enableRelay, cfg.Swarm.EnableRelayHop)),
		fx.Provide(dms3p2p.Transports(cfg.Swarm.Transports)),
		fx.Invoke(dms3p2p.StartListening(cfg.Addresses.Swarm)),
		fx.Invoke(dms3p2p.SetupDiscovery(cfg.Discovery.MDNS.Enabled, cfg.Discovery.MDNS.Interval)),
137

tavit ohanian's avatar
tavit ohanian committed
138
		fx.Provide(dms3p2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)),
139

tavit ohanian's avatar
tavit ohanian committed
140 141 142
		fx.Provide(dms3p2p.Routing),
		fx.Provide(dms3p2p.BaseRouting),
		maybeProvide(dms3p2p.PubsubRouter, bcfg.getOpt("dms3nsps")),
143

tavit ohanian's avatar
tavit ohanian committed
144 145 146
		maybeProvide(dms3p2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
		maybeProvide(dms3p2p.NatPortMap, !cfg.Swarm.DisableNatPortMap),
		maybeProvide(dms3p2p.AutoRelay, cfg.Swarm.EnableAutoRelay),
147
		autonat,
148 149
		connmgr,
		ps,
150
		disc,
151
	)
152 153

	return opts
154 155
}

156
// Storage groups units which setup datastore based persistence and blockstore layers
157 158 159 160 161 162 163 164 165 166 167 168
func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
	cacheOpts := blockstore.DefaultCacheOpts()
	cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
	if !bcfg.Permanent {
		cacheOpts.HasBloomFilterSize = 0
	}

	finalBstore := fx.Provide(GcBlockstoreCtor)
	if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled {
		finalBstore = fx.Provide(FilestoreBlockstoreCtor)
	}

169 170
	return fx.Options(
		fx.Provide(RepoConfig),
171
		fx.Provide(Datastore),
172 173
		fx.Provide(BaseBlockstoreCtor(cacheOpts, bcfg.NilRepo, cfg.Datastore.HashOnRead)),
		finalBstore,
174 175
	)
}
Łukasz Magiera's avatar
Łukasz Magiera committed
176

177
// Identity groups units providing cryptographic identity
178 179 180 181 182
func Identity(cfg *config.Config) fx.Option {
	// PeerID

	cid := cfg.Identity.PeerID
	if cid == "" {
tavit ohanian's avatar
tavit ohanian committed
183
		return fx.Error(errors.New("identity was not set in config (was 'dms3 init' run?)"))
184 185
	}
	if len(cid) == 0 {
tavit ohanian's avatar
tavit ohanian committed
186
		return fx.Error(errors.New("no peer ID in config! (was 'dms3 init' run?)"))
187 188
	}

189
	id, err := peer.Decode(cid)
190 191 192 193 194 195 196 197 198
	if err != nil {
		return fx.Error(fmt.Errorf("peer ID invalid: %s", err))
	}

	// Private Key

	if cfg.Identity.PrivKey == "" {
		return fx.Options( // No PK (usually in tests)
			fx.Provide(PeerID(id)),
tavit ohanian's avatar
tavit ohanian committed
199
			fx.Provide(dms3p2p.Peerstore),
200 201 202 203 204 205 206 207 208 209 210
		)
	}

	sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!")
	if err != nil {
		return fx.Error(err)
	}

	return fx.Options( // Full identity
		fx.Provide(PeerID(id)),
		fx.Provide(PrivateKey(sk)),
tavit ohanian's avatar
tavit ohanian committed
211
		fx.Provide(dms3p2p.Peerstore),
212

tavit ohanian's avatar
tavit ohanian committed
213
		fx.Invoke(dms3p2p.PstoreAddSelfKeys),
214 215
	)
}
Łukasz Magiera's avatar
Łukasz Magiera committed
216

tavit ohanian's avatar
tavit ohanian committed
217 218
// DMS3NS groups namesys related units
var DMS3NS = fx.Options(
Łukasz Magiera's avatar
Łukasz Magiera committed
219 220 221
	fx.Provide(RecordValidator),
)

222
// Online groups online-only units
223 224 225 226
func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {

	// Namesys params

tavit ohanian's avatar
tavit ohanian committed
227 228 229
	dms3nsCacheSize := cfg.Dms3Ns.ResolveCacheSize
	if dms3nsCacheSize == 0 {
		dms3nsCacheSize = DefaultDms3NsCacheSize
230
	}
tavit ohanian's avatar
tavit ohanian committed
231
	if dms3nsCacheSize < 0 {
232 233 234 235 236 237 238
		return fx.Error(fmt.Errorf("cannot specify negative resolve cache size"))
	}

	// Republisher params

	var repubPeriod, recordLifetime time.Duration

tavit ohanian's avatar
tavit ohanian committed
239 240
	if cfg.Dms3Ns.RepublishPeriod != "" {
		d, err := time.ParseDuration(cfg.Dms3Ns.RepublishPeriod)
241
		if err != nil {
tavit ohanian's avatar
tavit ohanian committed
242
			return fx.Error(fmt.Errorf("failure to parse config setting DMS3NS.RepublishPeriod: %s", err))
243 244 245
		}

		if !util.Debug && (d < time.Minute || d > (time.Hour*24)) {
tavit ohanian's avatar
tavit ohanian committed
246
			return fx.Error(fmt.Errorf("config setting DMS3NS.RepublishPeriod is not between 1min and 1day: %s", d))
247 248 249 250 251
		}

		repubPeriod = d
	}

tavit ohanian's avatar
tavit ohanian committed
252 253
	if cfg.Dms3Ns.RecordLifetime != "" {
		d, err := time.ParseDuration(cfg.Dms3Ns.RecordLifetime)
254
		if err != nil {
tavit ohanian's avatar
tavit ohanian committed
255
			return fx.Error(fmt.Errorf("failure to parse config setting DMS3NS.RecordLifetime: %s", err))
256 257 258 259 260
		}

		recordLifetime = d
	}

261 262 263
	/* don't provide from bitswap when the strategic provider service is active */
	shouldBitswapProvide := !cfg.Experimental.StrategicProviding

264
	return fx.Options(
265
		fx.Provide(OnlineExchange(shouldBitswapProvide)),
266
		maybeProvide(Graphsync, cfg.Experimental.GraphsyncEnabled),
tavit ohanian's avatar
tavit ohanian committed
267
		fx.Provide(Namesys(dms3nsCacheSize)),
Steven Allen's avatar
Steven Allen committed
268 269
		fx.Provide(Peering),
		PeerWith(cfg.Peering.Peers...),
Łukasz Magiera's avatar
Łukasz Magiera committed
270

tavit ohanian's avatar
tavit ohanian committed
271
		fx.Invoke(Dms3NsRepublisher(repubPeriod, recordLifetime)),
Łukasz Magiera's avatar
Łukasz Magiera committed
272

273
		fx.Provide(p2p.New),
Łukasz Magiera's avatar
Łukasz Magiera committed
274

tavit ohanian's avatar
tavit ohanian committed
275
		P2p(bcfg, cfg),
Michael Avila's avatar
Michael Avila committed
276
		OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
277 278
	)
}
279

280
// Offline groups offline alternatives to Online units
281 282 283 284 285
func Offline(cfg *config.Config) fx.Option {
	return fx.Options(
		fx.Provide(offline.Exchange),
		fx.Provide(Namesys(0)),
		fx.Provide(offroute.NewOfflineRouter),
Michael Avila's avatar
Michael Avila committed
286
		OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
287 288
	)
}
Łukasz Magiera's avatar
Łukasz Magiera committed
289

tavit ohanian's avatar
tavit ohanian committed
290
// Core groups basic DMS3 services
291
var Core = fx.Options(
292 293
	fx.Provide(BlockService),
	fx.Provide(Dag),
294 295 296 297 298
	fx.Provide(resolver.NewBasicResolver),
	fx.Provide(Pinning),
	fx.Provide(Files),
)

299 300 301
func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option {
	if bcfg.Online {
		return Online(bcfg, cfg)
Łukasz Magiera's avatar
Łukasz Magiera committed
302
	}
303
	return Offline(cfg)
Łukasz Magiera's avatar
Łukasz Magiera committed
304
}
305

tavit ohanian's avatar
tavit ohanian committed
306 307
// DMS3 builds a group of fx Options based on the passed BuildCfg
func DMS3(ctx context.Context, bcfg *BuildCfg) fx.Option {
308 309 310 311 312
	if bcfg == nil {
		bcfg = new(BuildCfg)
	}

	bcfgOpts, cfg := bcfg.options(ctx)
313
	if cfg == nil {
314
		return bcfgOpts // error
315 316
	}

317 318 319
	// TEMP: setting global sharding switch here
	uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled

320
	return fx.Options(
321
		bcfgOpts,
322

323 324
		fx.Provide(baseProcess),

325 326
		Storage(bcfg, cfg),
		Identity(cfg),
tavit ohanian's avatar
tavit ohanian committed
327
		DMS3NS,
328
		Networked(bcfg, cfg),
329 330 331

		Core,
	)
332
}