blockhooks.go 2.22 KB
Newer Older
Hannah Howard's avatar
Hannah Howard committed
1
package hooks
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20

import (
	"errors"
	"sync"

	"github.com/ipfs/go-graphsync"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

// ErrPaused indicates a request should stop processing, but only cause it's paused
var ErrPaused = errors.New("request has been paused")

type blockHook struct {
	key  uint64
	hook graphsync.OnOutgoingBlockHook
}

// OutgoingBlockHooks is a set of outgoing block hooks that can be processed
type OutgoingBlockHooks struct {
Hannah Howard's avatar
Hannah Howard committed
21 22 23
	hooksLk sync.RWMutex
	nextKey uint64
	hooks   []blockHook
24 25
}

Hannah Howard's avatar
Hannah Howard committed
26 27
// NewBlockHooks returns a new list of outgoing block hooks
func NewBlockHooks() *OutgoingBlockHooks {
28 29 30 31 32
	return &OutgoingBlockHooks{}
}

// Register registers an hook to process outgoing blocks in a response
func (obh *OutgoingBlockHooks) Register(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
Hannah Howard's avatar
Hannah Howard committed
33 34 35 36 37
	obh.hooksLk.Lock()
	bh := blockHook{obh.nextKey, hook}
	obh.nextKey++
	obh.hooks = append(obh.hooks, bh)
	obh.hooksLk.Unlock()
38
	return func() {
Hannah Howard's avatar
Hannah Howard committed
39 40 41
		obh.hooksLk.Lock()
		defer obh.hooksLk.Unlock()
		for i, matchHook := range obh.hooks {
42
			if bh.key == matchHook.key {
Hannah Howard's avatar
Hannah Howard committed
43
				obh.hooks = append(obh.hooks[:i], obh.hooks[i+1:]...)
44 45 46 47 48 49
				return
			}
		}
	}
}

Hannah Howard's avatar
Hannah Howard committed
50 51
// BlockResult is the result of processing block hooks
type BlockResult struct {
52 53 54 55 56
	Err        error
	Extensions []graphsync.ExtensionData
}

// ProcessBlockHooks runs block hooks against a request and block data
Hannah Howard's avatar
Hannah Howard committed
57 58 59
func (obh *OutgoingBlockHooks) ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) BlockResult {
	obh.hooksLk.RLock()
	defer obh.hooksLk.RUnlock()
60
	bha := &blockHookActions{}
Hannah Howard's avatar
Hannah Howard committed
61
	for _, bh := range obh.hooks {
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
		bh.hook(p, request, blockData, bha)
		if bha.hasError() {
			break
		}
	}
	return bha.result()
}

type blockHookActions struct {
	err        error
	extensions []graphsync.ExtensionData
}

func (bha *blockHookActions) hasError() bool {
	return bha.err != nil
}

Hannah Howard's avatar
Hannah Howard committed
79 80
func (bha *blockHookActions) result() BlockResult {
	return BlockResult{bha.err, bha.extensions}
81 82 83 84 85 86 87 88 89 90 91 92 93
}

func (bha *blockHookActions) SendExtensionData(data graphsync.ExtensionData) {
	bha.extensions = append(bha.extensions, data)
}

func (bha *blockHookActions) TerminateWithError(err error) {
	bha.err = err
}

func (bha *blockHookActions) PauseResponse() {
	bha.err = ErrPaused
}