requestupdatehooks.go 2.15 KB
Newer Older
Hannah Howard's avatar
Hannah Howard committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
package hooks

import (
	"sync"

	"github.com/ipfs/go-graphsync"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

type requestUpdatedHook struct {
	key  uint64
	hook graphsync.OnRequestUpdatedHook
}

// RequestUpdatedHooks manages and runs hooks for request updates
type RequestUpdatedHooks struct {
	nextKey uint64
	hooksLk sync.RWMutex
	hooks   []requestUpdatedHook
}

// NewUpdateHooks returns a new list of request updated hooks
func NewUpdateHooks() *RequestUpdatedHooks {
	return &RequestUpdatedHooks{}
}

// Register registers an hook to process updates to requests
func (ruh *RequestUpdatedHooks) Register(hook graphsync.OnRequestUpdatedHook) graphsync.UnregisterHookFunc {
	ruh.hooksLk.Lock()
	rh := requestUpdatedHook{ruh.nextKey, hook}
	ruh.nextKey++
	ruh.hooks = append(ruh.hooks, rh)
	ruh.hooksLk.Unlock()
	return func() {
		ruh.hooksLk.Lock()
		defer ruh.hooksLk.Unlock()
		for i, matchHook := range ruh.hooks {
			if rh.key == matchHook.key {
				ruh.hooks = append(ruh.hooks[:i], ruh.hooks[i+1:]...)
				return
			}
		}
	}
}

// UpdateResult is the result of running update hooks
type UpdateResult struct {
	Err        error
	Unpause    bool
	Extensions []graphsync.ExtensionData
}

// ProcessUpdateHooks runs request hooks against an incoming request
func (ruh *RequestUpdatedHooks) ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) UpdateResult {
	ruh.hooksLk.RLock()
	defer ruh.hooksLk.RUnlock()
	ha := &updateHookActions{}
	for _, updateHook := range ruh.hooks {
		updateHook.hook(p, request, update, ha)
		if ha.hasError() {
			break
		}
	}
	return ha.result()
}

type updateHookActions struct {
	err        error
	unpause    bool
	extensions []graphsync.ExtensionData
}

func (uha *updateHookActions) hasError() bool {
	return uha.err != nil
}

func (uha *updateHookActions) result() UpdateResult {
	return UpdateResult{uha.err, uha.unpause, uha.extensions}
}

func (uha *updateHookActions) SendExtensionData(data graphsync.ExtensionData) {
	uha.extensions = append(uha.extensions, data)
}

func (uha *updateHookActions) TerminateWithError(err error) {
	uha.err = err
}

func (uha *updateHookActions) UnpauseResponse() {
	uha.unpause = true
}