package node import ( "context" "fmt" "time" pin "gitlab.dms3.io/dms3/go-dms3-pinner" provider "gitlab.dms3.io/dms3/go-dms3-provider" q "gitlab.dms3.io/dms3/go-dms3-provider/queue" "gitlab.dms3.io/dms3/go-dms3-provider/simple" ld "gitlab.dms3.io/dms3/go-ld-format" "gitlab.dms3.io/p2p/go-p2p-core/routing" "go.uber.org/fx" "gitlab.dms3.io/dms3/go-dms3/core/node/helpers" "gitlab.dms3.io/dms3/go-dms3/repo" ) const kReprovideFrequency = time.Hour * 12 // SIMPLE // ProviderQueue creates new datastore backed provider queue func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*q.Queue, error) { return q.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore()) } // SimpleProvider creates new record provider func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt routing.Routing) provider.Provider { return simple.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt) } // SimpleReprovider creates new reprovider func SimpleReprovider(reproviderInterval time.Duration) interface{} { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.Routing, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) { return simple.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil } } // SimpleProviderSys creates new provider system func SimpleProviderSys(isOnline bool) interface{} { return func(lc fx.Lifecycle, p provider.Provider, r provider.Reprovider) provider.System { sys := provider.NewSystem(p, r) if isOnline { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { sys.Run() return nil }, OnStop: func(ctx context.Context) error { return sys.Close() }, }) } return sys } } // ONLINE/OFFLINE // OnlineProviders groups units managing provider routing records online func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option { if useStrategicProviding { return fx.Provide(provider.NewOfflineProvider) } return fx.Options( SimpleProviders(reprovideStrategy, reprovideInterval), fx.Provide(SimpleProviderSys(true)), ) } // OfflineProviders groups units managing provider routing records offline func OfflineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option { if useStrategicProviding { return fx.Provide(provider.NewOfflineProvider) } return fx.Options( SimpleProviders(reprovideStrategy, reprovideInterval), fx.Provide(SimpleProviderSys(false)), ) } // SimpleProviders creates the simple provider/reprovider dependencies func SimpleProviders(reprovideStrategy string, reprovideInterval string) fx.Option { reproviderInterval := kReprovideFrequency if reprovideInterval != "" { dur, err := time.ParseDuration(reprovideInterval) if err != nil { return fx.Error(err) } reproviderInterval = dur } var keyProvider fx.Option switch reprovideStrategy { case "all": fallthrough case "": keyProvider = fx.Provide(simple.NewBlockstoreProvider) case "roots": keyProvider = fx.Provide(pinnedProviderStrategy(true)) case "pinned": keyProvider = fx.Provide(pinnedProviderStrategy(false)) default: return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", reprovideStrategy)) } return fx.Options( fx.Provide(ProviderQueue), fx.Provide(SimpleProvider), keyProvider, fx.Provide(SimpleReprovider(reproviderInterval)), ) } func pinnedProviderStrategy(onlyRoots bool) interface{} { return func(pinner pin.Pinner, dag ld.DAGService) simple.KeyChanFunc { return simple.NewPinnedProvider(onlyRoots, pinner, dag) } }