routing.go 2.21 KB
Newer Older
tavit ohanian's avatar
tavit ohanian committed
1
package dms3p2p
2 3 4 5

import (
	"context"
	"sort"
6
	"time"
7

tavit ohanian's avatar
tavit ohanian committed
8
	"gitlab.dms3.io/dms3/go-dms3/core/node/helpers"
Steven Allen's avatar
Steven Allen committed
9

tavit ohanian's avatar
tavit ohanian committed
10 11 12 13 14 15 16
	host "gitlab.dms3.io/p2p/go-p2p-core/host"
	routing "gitlab.dms3.io/p2p/go-p2p-core/routing"
	ddht "gitlab.dms3.io/p2p/go-p2p-kad-dht/dual"
	pubsub "gitlab.dms3.io/p2p/go-p2p-pubsub"
	namesys "gitlab.dms3.io/p2p/go-p2p-pubsub-router"
	record "gitlab.dms3.io/p2p/go-p2p-record"
	routinghelpers "gitlab.dms3.io/p2p/go-p2p-routing-helpers"
17

Steven Allen's avatar
Steven Allen committed
18
	"go.uber.org/fx"
19 20
)

tavit ohanian's avatar
tavit ohanian committed
21
type BaseDms3Routing routing.Routing
22 23

type Router struct {
Raúl Kripalani's avatar
Raúl Kripalani committed
24
	routing.Routing
25 26 27 28 29 30 31 32 33 34

	Priority int // less = more important
}

type p2pRouterOut struct {
	fx.Out

	Router Router `group:"routers"`
}

tavit ohanian's avatar
tavit ohanian committed
35
func BaseRouting(lc fx.Lifecycle, in BaseDms3Routing) (out p2pRouterOut, dr *ddht.DHT) {
Steven Allen's avatar
Steven Allen committed
36
	if dht, ok := in.(*ddht.DHT); ok {
37 38 39 40 41 42 43 44 45 46 47
		dr = dht

		lc.Append(fx.Hook{
			OnStop: func(ctx context.Context) error {
				return dr.Close()
			},
		})
	}

	return p2pRouterOut{
		Router: Router{
Raúl Kripalani's avatar
Raúl Kripalani committed
48 49
			Priority: 1000,
			Routing:  in,
50 51 52 53 54 55 56 57 58 59 60
		},
	}, dr
}

type p2pOnlineRoutingIn struct {
	fx.In

	Routers   []Router `group:"routers"`
	Validator record.Validator
}

Raúl Kripalani's avatar
Raúl Kripalani committed
61
func Routing(in p2pOnlineRoutingIn) routing.Routing {
62 63 64 65 66 67
	routers := in.Routers

	sort.SliceStable(routers, func(i, j int) bool {
		return routers[i].Priority < routers[j].Priority
	})

Raúl Kripalani's avatar
Raúl Kripalani committed
68
	irouters := make([]routing.Routing, len(routers))
69
	for i, v := range routers {
Raúl Kripalani's avatar
Raúl Kripalani committed
70
		irouters[i] = v.Routing
71 72 73 74 75 76 77 78 79 80 81
	}

	return routinghelpers.Tiered{
		Routers:   irouters,
		Validator: in.Validator,
	}
}

type p2pPSRoutingIn struct {
	fx.In

tavit ohanian's avatar
tavit ohanian committed
82
	BaseDms3Routing BaseDms3Routing
Raúl Kripalani's avatar
Raúl Kripalani committed
83 84 85
	Validator       record.Validator
	Host            host.Host
	PubSub          *pubsub.PubSub `optional:"true"`
86 87
}

88 89
func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore, error) {
	psRouter, err := namesys.NewPubsubValueStore(
90 91 92 93
		helpers.LifecycleCtx(mctx, lc),
		in.Host,
		in.PubSub,
		in.Validator,
94
		namesys.WithRebroadcastInterval(time.Minute),
95 96
	)

97 98 99 100
	if err != nil {
		return p2pRouterOut{}, nil, err
	}

101 102
	return p2pRouterOut{
		Router: Router{
Raúl Kripalani's avatar
Raúl Kripalani committed
103
			Routing: &routinghelpers.Compose{
104 105
				ValueStore: &routinghelpers.LimitedValueStore{
					ValueStore: psRouter,
tavit ohanian's avatar
tavit ohanian committed
106
					Namespaces: []string{"dms3ns"},
107 108 109 110
				},
			},
			Priority: 100,
		},
111
	}, psRouter, nil
112
}