queryexecutor.go 8.11 KB
Newer Older
1 2 3 4 5
package responsemanager

import (
	"context"
	"errors"
6
	"strings"
7 8 9
	"time"

	"github.com/ipfs/go-cid"
Hannah Howard's avatar
Hannah Howard committed
10 11 12 13
	ipld "github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
	"github.com/libp2p/go-libp2p-core/peer"

14 15 16 17 18 19 20 21 22
	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/cidset"
	"github.com/ipfs/go-graphsync/ipldutil"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/responsemanager/hooks"
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
	"github.com/ipfs/go-graphsync/responsemanager/runtraversal"
)

23 24
var errCancelledByCommand = errors.New("response cancelled by responder")

25 26
// TODO: Move this into a seperate module and fully seperate from the ResponseManager
type queryExecutor struct {
27 28 29
	requestHooks       RequestHooks
	blockHooks         BlockHooks
	updateHooks        UpdateHooks
30
	completedListeners CompletedListeners
31 32 33 34 35 36 37 38
	cancelledListeners CancelledListeners
	peerManager        PeerManager
	loader             ipld.Loader
	queryQueue         QueryQueue
	messages           chan responseManagerMessage
	ctx                context.Context
	workSignal         chan struct{}
	ticker             *time.Ticker
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
}

func (qe *queryExecutor) processQueriesWorker() {
	const targetWork = 1
	taskDataChan := make(chan responseTaskData)
	var taskData responseTaskData
	for {
		pid, tasks, _ := qe.queryQueue.PopTasks(targetWork)
		for len(tasks) == 0 {
			select {
			case <-qe.ctx.Done():
				return
			case <-qe.workSignal:
				pid, tasks, _ = qe.queryQueue.PopTasks(targetWork)
			case <-qe.ticker.C:
				qe.queryQueue.ThawRound()
				pid, tasks, _ = qe.queryQueue.PopTasks(targetWork)
			}
		}
		for _, task := range tasks {
			key := task.Topic.(responseKey)
			select {
			case qe.messages <- &responseDataRequest{key, taskDataChan}:
			case <-qe.ctx.Done():
				return
			}
			select {
			case taskData = <-taskDataChan:
			case <-qe.ctx.Done():
				return
			}
			if taskData.empty {
				log.Info("Empty task on peer request stack")
				continue
			}
			status, err := qe.executeTask(key, taskData)
75 76 77 78 79 80 81
			_, isPaused := err.(hooks.ErrPaused)
			isCancelled := err != nil && isContextErr(err)
			if isCancelled {
				qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request)
			} else if !isPaused {
				qe.completedListeners.NotifyCompletedListeners(key.p, taskData.request, status)
			}
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
			select {
			case qe.messages <- &finishTaskRequest{key, status, err}:
			case <-qe.ctx.Done():
			}
		}
		qe.queryQueue.TasksDone(pid, tasks...)

	}

}

func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) (graphsync.ResponseStatusCode, error) {
	var err error
	loader := taskData.loader
	traverser := taskData.traverser
	if loader == nil || traverser == nil {
98 99
		var isPaused bool
		loader, traverser, isPaused, err = qe.prepareQuery(taskData.ctx, key.p, taskData.request)
100 101 102 103 104 105 106 107
		if err != nil {
			return graphsync.RequestFailedUnknown, err
		}
		select {
		case <-qe.ctx.Done():
			return graphsync.RequestFailedUnknown, errors.New("context cancelled")
		case qe.messages <- &setResponseDataRequest{key, loader, traverser}:
		}
108 109 110
		if isPaused {
			return graphsync.RequestPaused, hooks.ErrPaused{}
		}
111
	}
112
	return qe.executeQuery(key.p, taskData.request, loader, traverser, taskData.signals)
113 114 115 116
}

func (qe *queryExecutor) prepareQuery(ctx context.Context,
	p peer.ID,
117
	request gsmsg.GraphSyncRequest) (ipld.Loader, ipldutil.Traverser, bool, error) {
118 119
	result := qe.requestHooks.ProcessRequestHooks(p, request)
	peerResponseSender := qe.peerManager.SenderForPeer(p)
120 121
	var transactionError error
	var isPaused bool
122 123 124 125 126 127
	err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
		for _, extension := range result.Extensions {
			transaction.SendExtensionData(extension)
		}
		if result.Err != nil || !result.IsValidated {
			transaction.FinishWithError(graphsync.RequestFailedUnknown)
128 129 130 131
			transactionError = errors.New("request not valid")
		} else if result.IsPaused {
			transaction.PauseRequest()
			isPaused = true
132 133 134 135
		}
		return nil
	})
	if err != nil {
136
		return nil, nil, false, err
137
	}
138 139
	if transactionError != nil {
		return nil, nil, false, transactionError
140 141
	}
	if err := qe.processDoNoSendCids(request, peerResponseSender); err != nil {
142
		return nil, nil, false, err
143 144 145 146 147 148 149 150 151 152 153
	}
	rootLink := cidlink.Link{Cid: request.Root()}
	traverser := ipldutil.TraversalBuilder{
		Root:     rootLink,
		Selector: request.Selector(),
		Chooser:  result.CustomChooser,
	}.Start(ctx)
	loader := result.CustomLoader
	if loader == nil {
		loader = qe.loader
	}
154
	return loader, traverser, isPaused, nil
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
}

func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, peerResponseSender peerresponsemanager.PeerResponseSender) error {
	doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
	if !has {
		return nil
	}
	cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
	if err != nil {
		peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
		return err
	}
	links := make([]ipld.Link, 0, cidSet.Len())
	err = cidSet.ForEach(func(c cid.Cid) error {
		links = append(links, cidlink.Link{Cid: c})
		return nil
	})
	if err != nil {
		return err
	}
	peerResponseSender.IgnoreBlocks(request.ID(), links)
	return nil
}

func (qe *queryExecutor) executeQuery(
	p peer.ID,
	request gsmsg.GraphSyncRequest,
	loader ipld.Loader,
	traverser ipldutil.Traverser,
184
	signals signals) (graphsync.ResponseStatusCode, error) {
185 186 187 188 189
	updateChan := make(chan []gsmsg.GraphSyncRequest)
	peerResponseSender := qe.peerManager.SenderForPeer(p)
	err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
		var err error
		_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
190 191
			err = qe.checkForUpdates(p, request, signals, updateChan, transaction)
			if _, ok := err.(hooks.ErrPaused); !ok && err != nil {
192 193 194 195 196 197 198 199
				return nil
			}
			blockData := transaction.SendResponse(link, data)
			if blockData.BlockSize() > 0 {
				result := qe.blockHooks.ProcessBlockHooks(p, request, blockData)
				for _, extension := range result.Extensions {
					transaction.SendExtensionData(extension)
				}
200
				if _, ok := result.Err.(hooks.ErrPaused); ok {
201 202
					transaction.PauseRequest()
				}
203 204 205
				if result.Err != nil {
					err = result.Err
				}
206 207 208 209 210
			}
			return nil
		})
		return err
	})
211 212 213 214 215
	if err != nil {
		_, isPaused := err.(hooks.ErrPaused)
		if isPaused {
			return graphsync.RequestPaused, err
		}
216
		if isContextErr(err) {
217 218
			peerResponseSender.FinishWithCancel(request.ID())
			return graphsync.RequestCancelled, err
219
		}
220 221 222 223 224 225
		if err == errCancelledByCommand {
			peerResponseSender.FinishWithError(request.ID(), graphsync.RequestCancelled)
			return graphsync.RequestCancelled, err
		}
		peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
		return graphsync.RequestFailedUnknown, err
226
	}
227
	return peerResponseSender.FinishRequest(request.ID()), nil
228 229 230 231 232
}

func (qe *queryExecutor) checkForUpdates(
	p peer.ID,
	request gsmsg.GraphSyncRequest,
233
	signals signals,
234 235 236 237
	updateChan chan []gsmsg.GraphSyncRequest,
	peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error {
	for {
		select {
238 239 240 241 242 243 244 245 246
		case selfCancelled := <-signals.stopSignal:
			if selfCancelled {
				return errCancelledByCommand
			}
			return ipldutil.ContextCancelError{}
		case <-signals.pauseSignal:
			peerResponseSender.PauseRequest()
			return hooks.ErrPaused{}
		case <-signals.updateSignal:
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
			select {
			case qe.messages <- &responseUpdateRequest{responseKey{p, request.ID()}, updateChan}:
			case <-qe.ctx.Done():
			}
			select {
			case updates := <-updateChan:
				for _, update := range updates {
					result := qe.updateHooks.ProcessUpdateHooks(p, request, update)
					for _, extension := range result.Extensions {
						peerResponseSender.SendExtensionData(extension)
					}
					if result.Err != nil {
						return result.Err
					}
				}
			case <-qe.ctx.Done():
			}
		default:
			return nil
		}
	}
}
269 270 271

func isContextErr(err error) bool {
	// TODO: Match with errors.Is when https://github.com/ipld/go-ipld-prime/issues/58 is resolved
272
	return strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error())
273
}