blockhooks.go 2.13 KB
Newer Older
Hannah Howard's avatar
Hannah Howard committed
1
package hooks
2 3 4 5

import (
	"errors"

6
	"github.com/hannahhoward/go-pubsub"
7 8 9 10 11 12 13 14 15
	"github.com/ipfs/go-graphsync"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

// ErrPaused indicates a request should stop processing, but only cause it's paused
var ErrPaused = errors.New("request has been paused")

// OutgoingBlockHooks is a set of outgoing block hooks that can be processed
type OutgoingBlockHooks struct {
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
	pubSub *pubsub.PubSub
}

type internalBlockHookEvent struct {
	p       peer.ID
	request graphsync.RequestData
	block   graphsync.BlockData
	bha     *blockHookActions
}

func blockHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
	ie := event.(internalBlockHookEvent)
	hook := subscriberFn.(graphsync.OnOutgoingBlockHook)
	hook(ie.p, ie.request, ie.block, ie.bha)
	return ie.bha.err
31 32
}

Hannah Howard's avatar
Hannah Howard committed
33 34
// NewBlockHooks returns a new list of outgoing block hooks
func NewBlockHooks() *OutgoingBlockHooks {
35
	return &OutgoingBlockHooks{pubSub: pubsub.New(blockHookDispatcher)}
36 37 38 39
}

// Register registers an hook to process outgoing blocks in a response
func (obh *OutgoingBlockHooks) Register(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
40
	return graphsync.UnregisterHookFunc(obh.pubSub.Subscribe(hook))
41 42
}

Hannah Howard's avatar
Hannah Howard committed
43 44
// BlockResult is the result of processing block hooks
type BlockResult struct {
45 46 47 48 49
	Err        error
	Extensions []graphsync.ExtensionData
}

// ProcessBlockHooks runs block hooks against a request and block data
Hannah Howard's avatar
Hannah Howard committed
50
func (obh *OutgoingBlockHooks) ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) BlockResult {
51
	bha := &blockHookActions{}
52
	_ = obh.pubSub.Publish(internalBlockHookEvent{p, request, blockData, bha})
53 54 55 56 57 58 59 60
	return bha.result()
}

type blockHookActions struct {
	err        error
	extensions []graphsync.ExtensionData
}

Hannah Howard's avatar
Hannah Howard committed
61 62
func (bha *blockHookActions) result() BlockResult {
	return BlockResult{bha.err, bha.extensions}
63 64 65 66 67 68 69 70 71 72 73 74 75
}

func (bha *blockHookActions) SendExtensionData(data graphsync.ExtensionData) {
	bha.extensions = append(bha.extensions, data)
}

func (bha *blockHookActions) TerminateWithError(err error) {
	bha.err = err
}

func (bha *blockHookActions) PauseResponse() {
	bha.err = ErrPaused
}