package hooks import ( "errors" "github.com/hannahhoward/go-pubsub" ld "gitlab.dms3.io/ld/go-ld-prime" "gitlab.dms3.io/ld/go-ld-prime/traversal" peer "gitlab.dms3.io/p2p/go-p2p-core/peer" "gitlab.dms3.io/dms3/go-graphsync" ) // PersistenceOptions is an interface for getting loaders by name type PersistenceOptions interface { GetLinkSystem(name string) (ld.LinkSystem, bool) } // IncomingRequestHooks is a set of incoming request hooks that can be processed type IncomingRequestHooks struct { persistenceOptions PersistenceOptions pubSub *pubsub.PubSub } type internalRequestHookEvent struct { p peer.ID request graphsync.RequestData rha *requestHookActions } func requestHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error { ie := event.(internalRequestHookEvent) hook := subscriberFn.(graphsync.OnIncomingRequestHook) hook(ie.p, ie.request, ie.rha) return ie.rha.err } // NewRequestHooks returns a new list of incoming request hooks func NewRequestHooks(persistenceOptions PersistenceOptions) *IncomingRequestHooks { return &IncomingRequestHooks{ persistenceOptions: persistenceOptions, pubSub: pubsub.New(requestHookDispatcher), } } // Register registers an extension to process new incoming requests func (irh *IncomingRequestHooks) Register(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc { return graphsync.UnregisterHookFunc(irh.pubSub.Subscribe(hook)) } // RequestResult is the outcome of running requesthooks type RequestResult struct { IsValidated bool IsPaused bool CustomLinkSystem ld.LinkSystem CustomChooser traversal.LinkTargetNodePrototypeChooser Err error Extensions []graphsync.ExtensionData } // ProcessRequestHooks runs request hooks against an incoming request func (irh *IncomingRequestHooks) ProcessRequestHooks(p peer.ID, request graphsync.RequestData) RequestResult { ha := &requestHookActions{ persistenceOptions: irh.persistenceOptions, } _ = irh.pubSub.Publish(internalRequestHookEvent{p, request, ha}) return ha.result() } type requestHookActions struct { persistenceOptions PersistenceOptions isValidated bool isPaused bool err error linkSystem ld.LinkSystem chooser traversal.LinkTargetNodePrototypeChooser extensions []graphsync.ExtensionData } func (ha *requestHookActions) result() RequestResult { return RequestResult{ IsValidated: ha.isValidated, IsPaused: ha.isPaused, CustomLinkSystem: ha.linkSystem, CustomChooser: ha.chooser, Err: ha.err, Extensions: ha.extensions, } } func (ha *requestHookActions) SendExtensionData(ext graphsync.ExtensionData) { ha.extensions = append(ha.extensions, ext) } func (ha *requestHookActions) TerminateWithError(err error) { ha.err = err } func (ha *requestHookActions) ValidateRequest() { ha.isValidated = true } func (ha *requestHookActions) UsePersistenceOption(name string) { linkSystem, ok := ha.persistenceOptions.GetLinkSystem(name) if !ok { ha.TerminateWithError(errors.New("unknown loader option")) return } ha.linkSystem = linkSystem } func (ha *requestHookActions) UseLinkTargetNodePrototypeChooser(chooser traversal.LinkTargetNodePrototypeChooser) { ha.chooser = chooser } func (ha *requestHookActions) PauseResponse() { ha.isPaused = true }