blockhooks.go 2.16 KB
Newer Older
Hannah Howard's avatar
Hannah Howard committed
1
package hooks
2 3

import (
4
	"github.com/hannahhoward/go-pubsub"
5
	peer "github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
6 7

	"github.com/ipfs/go-graphsync"
8 9 10
)

// ErrPaused indicates a request should stop processing, but only cause it's paused
11 12 13
type ErrPaused struct{}

func (e ErrPaused) Error() string { return "request has been paused" }
14 15 16

// OutgoingBlockHooks is a set of outgoing block hooks that can be processed
type OutgoingBlockHooks struct {
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
	pubSub *pubsub.PubSub
}

type internalBlockHookEvent struct {
	p       peer.ID
	request graphsync.RequestData
	block   graphsync.BlockData
	bha     *blockHookActions
}

func blockHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
	ie := event.(internalBlockHookEvent)
	hook := subscriberFn.(graphsync.OnOutgoingBlockHook)
	hook(ie.p, ie.request, ie.block, ie.bha)
	return ie.bha.err
32 33
}

Hannah Howard's avatar
Hannah Howard committed
34 35
// NewBlockHooks returns a new list of outgoing block hooks
func NewBlockHooks() *OutgoingBlockHooks {
36
	return &OutgoingBlockHooks{pubSub: pubsub.New(blockHookDispatcher)}
37 38 39 40
}

// Register registers an hook to process outgoing blocks in a response
func (obh *OutgoingBlockHooks) Register(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
41
	return graphsync.UnregisterHookFunc(obh.pubSub.Subscribe(hook))
42 43
}

Hannah Howard's avatar
Hannah Howard committed
44 45
// BlockResult is the result of processing block hooks
type BlockResult struct {
46 47 48 49 50
	Err        error
	Extensions []graphsync.ExtensionData
}

// ProcessBlockHooks runs block hooks against a request and block data
Hannah Howard's avatar
Hannah Howard committed
51
func (obh *OutgoingBlockHooks) ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) BlockResult {
52
	bha := &blockHookActions{}
53
	_ = obh.pubSub.Publish(internalBlockHookEvent{p, request, blockData, bha})
54 55 56 57 58 59 60 61
	return bha.result()
}

type blockHookActions struct {
	err        error
	extensions []graphsync.ExtensionData
}

Hannah Howard's avatar
Hannah Howard committed
62 63
func (bha *blockHookActions) result() BlockResult {
	return BlockResult{bha.err, bha.extensions}
64 65 66 67 68 69 70 71 72 73 74
}

func (bha *blockHookActions) SendExtensionData(data graphsync.ExtensionData) {
	bha.extensions = append(bha.extensions, data)
}

func (bha *blockHookActions) TerminateWithError(err error) {
	bha.err = err
}

func (bha *blockHookActions) PauseResponse() {
75
	bha.err = ErrPaused{}
76
}