provider.go 3.59 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1 2 3
package node

import (
4
	"context"
5
	"fmt"
Łukasz Magiera's avatar
Łukasz Magiera committed
6 7
	"time"

tavit ohanian's avatar
tavit ohanian committed
8 9 10 11 12 13
	pin "gitlab.dms3.io/dms3/go-dms3-pinner"
	provider "gitlab.dms3.io/dms3/go-dms3-provider"
	q "gitlab.dms3.io/dms3/go-dms3-provider/queue"
	"gitlab.dms3.io/dms3/go-dms3-provider/simple"
	ld "gitlab.dms3.io/dms3/go-ld-format"
	"gitlab.dms3.io/p2p/go-p2p-core/routing"
Raúl Kripalani's avatar
Raúl Kripalani committed
14
	"go.uber.org/fx"
15

tavit ohanian's avatar
tavit ohanian committed
16 17
	"gitlab.dms3.io/dms3/go-dms3/core/node/helpers"
	"gitlab.dms3.io/dms3/go-dms3/repo"
Łukasz Magiera's avatar
Łukasz Magiera committed
18 19 20 21
)

const kReprovideFrequency = time.Hour * 12

22 23
// SIMPLE

24
// ProviderQueue creates new datastore backed provider queue
25 26
func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*q.Queue, error) {
	return q.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore())
Łukasz Magiera's avatar
Łukasz Magiera committed
27 28
}

Michael Avila's avatar
Michael Avila committed
29
// SimpleProvider creates new record provider
Raúl Kripalani's avatar
Raúl Kripalani committed
30
func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt routing.Routing) provider.Provider {
31 32 33
	return simple.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt)
}

Michael Avila's avatar
Michael Avila committed
34
// SimpleReprovider creates new reprovider
35
func SimpleReprovider(reproviderInterval time.Duration) interface{} {
Raúl Kripalani's avatar
Raúl Kripalani committed
36
	return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.Routing, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) {
37 38 39
		return simple.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
	}
}
40

Michael Avila's avatar
Michael Avila committed
41
// SimpleProviderSys creates new provider system
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
func SimpleProviderSys(isOnline bool) interface{} {
	return func(lc fx.Lifecycle, p provider.Provider, r provider.Reprovider) provider.System {
		sys := provider.NewSystem(p, r)

		if isOnline {
			lc.Append(fx.Hook{
				OnStart: func(ctx context.Context) error {
					sys.Run()
					return nil
				},
				OnStop: func(ctx context.Context) error {
					return sys.Close()
				},
			})
		}
57

58 59
		return sys
	}
60 61 62 63 64
}

// ONLINE/OFFLINE

// OnlineProviders groups units managing provider routing records online
Michael Avila's avatar
Michael Avila committed
65 66
func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
	if useStrategicProviding {
67
		return fx.Provide(provider.NewOfflineProvider)
68
	}
69

70
	return fx.Options(
Michael Avila's avatar
Michael Avila committed
71
		SimpleProviders(reprovideStrategy, reprovideInterval),
72
		fx.Provide(SimpleProviderSys(true)),
73
	)
Łukasz Magiera's avatar
Łukasz Magiera committed
74 75
}

76
// OfflineProviders groups units managing provider routing records offline
Michael Avila's avatar
Michael Avila committed
77 78
func OfflineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
	if useStrategicProviding {
79
		return fx.Provide(provider.NewOfflineProvider)
Łukasz Magiera's avatar
Łukasz Magiera committed
80
	}
81 82

	return fx.Options(
Michael Avila's avatar
Michael Avila committed
83
		SimpleProviders(reprovideStrategy, reprovideInterval),
84
		fx.Provide(SimpleProviderSys(false)),
85
	)
Łukasz Magiera's avatar
Łukasz Magiera committed
86 87
}

88
// SimpleProviders creates the simple provider/reprovider dependencies
Michael Avila's avatar
Michael Avila committed
89
func SimpleProviders(reprovideStrategy string, reprovideInterval string) fx.Option {
90
	reproviderInterval := kReprovideFrequency
Michael Avila's avatar
Michael Avila committed
91 92
	if reprovideInterval != "" {
		dur, err := time.ParseDuration(reprovideInterval)
93 94 95 96 97 98 99 100
		if err != nil {
			return fx.Error(err)
		}

		reproviderInterval = dur
	}

	var keyProvider fx.Option
Michael Avila's avatar
Michael Avila committed
101
	switch reprovideStrategy {
102 103 104 105 106
	case "all":
		fallthrough
	case "":
		keyProvider = fx.Provide(simple.NewBlockstoreProvider)
	case "roots":
107
		keyProvider = fx.Provide(pinnedProviderStrategy(true))
108
	case "pinned":
109
		keyProvider = fx.Provide(pinnedProviderStrategy(false))
110
	default:
Michael Avila's avatar
Michael Avila committed
111
		return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", reprovideStrategy))
112 113 114 115
	}

	return fx.Options(
		fx.Provide(ProviderQueue),
Michael Avila's avatar
Michael Avila committed
116
		fx.Provide(SimpleProvider),
117
		keyProvider,
Michael Avila's avatar
Michael Avila committed
118
		fx.Provide(SimpleReprovider(reproviderInterval)),
119
	)
Łukasz Magiera's avatar
Łukasz Magiera committed
120
}
121 122

func pinnedProviderStrategy(onlyRoots bool) interface{} {
tavit ohanian's avatar
tavit ohanian committed
123
	return func(pinner pin.Pinner, dag ld.DAGService) simple.KeyChanFunc {
124 125 126
		return simple.NewPinnedProvider(onlyRoots, pinner, dag)
	}
}