responsehooks.go 1.98 KB
Newer Older
Hannah Howard's avatar
Hannah Howard committed
1 2 3
package hooks

import (
4
	"github.com/hannahhoward/go-pubsub"
Hannah Howard's avatar
Hannah Howard committed
5 6 7 8 9 10 11
	"github.com/libp2p/go-libp2p-core/peer"

	"github.com/ipfs/go-graphsync"
)

// IncomingResponseHooks is a set of incoming response hooks that can be processed
type IncomingResponseHooks struct {
12 13 14 15 16 17 18 19 20 21 22 23 24 25
	pubSub *pubsub.PubSub
}

type internalResponseHookEvent struct {
	p        peer.ID
	response graphsync.ResponseData
	rha      *responseHookActions
}

func responseHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
	ie := event.(internalResponseHookEvent)
	hook := subscriberFn.(graphsync.OnIncomingResponseHook)
	hook(ie.p, ie.response, ie.rha)
	return ie.rha.err
Hannah Howard's avatar
Hannah Howard committed
26 27 28 29
}

// NewResponseHooks returns a new list of incoming request hooks
func NewResponseHooks() *IncomingResponseHooks {
30
	return &IncomingResponseHooks{pubSub: pubsub.New(responseHookDispatcher)}
Hannah Howard's avatar
Hannah Howard committed
31 32 33 34
}

// Register registers an extension to process incoming responses
func (irh *IncomingResponseHooks) Register(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
35
	return graphsync.UnregisterHookFunc(irh.pubSub.Subscribe(hook))
Hannah Howard's avatar
Hannah Howard committed
36 37 38 39 40 41 42 43 44 45 46
}

// ResponseResult is the outcome of running response hooks
type ResponseResult struct {
	Err        error
	Extensions []graphsync.ExtensionData
}

// ProcessResponseHooks runs response hooks against an incoming response
func (irh *IncomingResponseHooks) ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) ResponseResult {
	rha := &responseHookActions{}
47
	_ = irh.pubSub.Publish(internalResponseHookEvent{p, response, rha})
Hannah Howard's avatar
Hannah Howard committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
	return rha.result()
}

type responseHookActions struct {
	err        error
	extensions []graphsync.ExtensionData
}

func (rha *responseHookActions) result() ResponseResult {
	return ResponseResult{
		Err:        rha.err,
		Extensions: rha.extensions,
	}
}

func (rha *responseHookActions) TerminateWithError(err error) {
	rha.err = err
}

func (rha *responseHookActions) UpdateRequestWithExtensions(extensions ...graphsync.ExtensionData) {
	rha.extensions = append(rha.extensions, extensions...)
}