requestupdatehooks.go 2.04 KB
Newer Older
Hannah Howard's avatar
Hannah Howard committed
1 2 3
package hooks

import (
4
	"github.com/hannahhoward/go-pubsub"
Hannah Howard's avatar
Hannah Howard committed
5 6 7 8 9 10
	"github.com/ipfs/go-graphsync"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

// RequestUpdatedHooks manages and runs hooks for request updates
type RequestUpdatedHooks struct {
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
	pubSub *pubsub.PubSub
}

type internalRequestUpdateEvent struct {
	p       peer.ID
	request graphsync.RequestData
	update  graphsync.RequestData
	uha     *updateHookActions
}

func updateHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
	ie := event.(internalRequestUpdateEvent)
	hook := subscriberFn.(graphsync.OnRequestUpdatedHook)
	hook(ie.p, ie.request, ie.update, ie.uha)
	return ie.uha.err
Hannah Howard's avatar
Hannah Howard committed
26 27 28 29
}

// NewUpdateHooks returns a new list of request updated hooks
func NewUpdateHooks() *RequestUpdatedHooks {
30
	return &RequestUpdatedHooks{pubSub: pubsub.New(updateHookDispatcher)}
Hannah Howard's avatar
Hannah Howard committed
31 32 33 34
}

// Register registers an hook to process updates to requests
func (ruh *RequestUpdatedHooks) Register(hook graphsync.OnRequestUpdatedHook) graphsync.UnregisterHookFunc {
35
	return graphsync.UnregisterHookFunc(ruh.pubSub.Subscribe(hook))
Hannah Howard's avatar
Hannah Howard committed
36 37 38 39 40 41 42 43 44 45 46 47
}

// UpdateResult is the result of running update hooks
type UpdateResult struct {
	Err        error
	Unpause    bool
	Extensions []graphsync.ExtensionData
}

// ProcessUpdateHooks runs request hooks against an incoming request
func (ruh *RequestUpdatedHooks) ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) UpdateResult {
	ha := &updateHookActions{}
48
	_ = ruh.pubSub.Publish(internalRequestUpdateEvent{p, request, update, ha})
Hannah Howard's avatar
Hannah Howard committed
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
	return ha.result()
}

type updateHookActions struct {
	err        error
	unpause    bool
	extensions []graphsync.ExtensionData
}

func (uha *updateHookActions) result() UpdateResult {
	return UpdateResult{uha.err, uha.unpause, uha.extensions}
}

func (uha *updateHookActions) SendExtensionData(data graphsync.ExtensionData) {
	uha.extensions = append(uha.extensions, data)
}

func (uha *updateHookActions) TerminateWithError(err error) {
	uha.err = err
}

func (uha *updateHookActions) UnpauseResponse() {
	uha.unpause = true
}