queryexecutor.go 8.11 KB
Newer Older
1 2 3 4 5
package responsemanager

import (
	"context"
	"errors"
6
	"strings"
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
	"time"

	"github.com/ipfs/go-cid"
	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/cidset"
	"github.com/ipfs/go-graphsync/ipldutil"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/responsemanager/hooks"
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
	"github.com/ipfs/go-graphsync/responsemanager/runtraversal"
	ipld "github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
	"github.com/libp2p/go-libp2p-core/peer"
)

22 23
var errCancelledByCommand = errors.New("response cancelled by responder")

24 25
// TODO: Move this into a seperate module and fully seperate from the ResponseManager
type queryExecutor struct {
26 27 28 29 30 31 32 33 34 35 36 37
	requestHooks       RequestHooks
	blockHooks         BlockHooks
	updateHooks        UpdateHooks
	completedListeners CompletedListeners
	cancelledListeners CancelledListeners
	peerManager        PeerManager
	loader             ipld.Loader
	queryQueue         QueryQueue
	messages           chan responseManagerMessage
	ctx                context.Context
	workSignal         chan struct{}
	ticker             *time.Ticker
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
}

func (qe *queryExecutor) processQueriesWorker() {
	const targetWork = 1
	taskDataChan := make(chan responseTaskData)
	var taskData responseTaskData
	for {
		pid, tasks, _ := qe.queryQueue.PopTasks(targetWork)
		for len(tasks) == 0 {
			select {
			case <-qe.ctx.Done():
				return
			case <-qe.workSignal:
				pid, tasks, _ = qe.queryQueue.PopTasks(targetWork)
			case <-qe.ticker.C:
				qe.queryQueue.ThawRound()
				pid, tasks, _ = qe.queryQueue.PopTasks(targetWork)
			}
		}
		for _, task := range tasks {
			key := task.Topic.(responseKey)
			select {
			case qe.messages <- &responseDataRequest{key, taskDataChan}:
			case <-qe.ctx.Done():
				return
			}
			select {
			case taskData = <-taskDataChan:
			case <-qe.ctx.Done():
				return
			}
			if taskData.empty {
				log.Info("Empty task on peer request stack")
				continue
			}
			status, err := qe.executeTask(key, taskData)
74 75 76 77 78 79 80
			_, isPaused := err.(hooks.ErrPaused)
			isCancelled := err != nil && isContextErr(err)
			if isCancelled {
				qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request)
			} else if !isPaused {
				qe.completedListeners.NotifyCompletedListeners(key.p, taskData.request, status)
			}
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
			select {
			case qe.messages <- &finishTaskRequest{key, status, err}:
			case <-qe.ctx.Done():
			}
		}
		qe.queryQueue.TasksDone(pid, tasks...)

	}

}

func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) (graphsync.ResponseStatusCode, error) {
	var err error
	loader := taskData.loader
	traverser := taskData.traverser
	if loader == nil || traverser == nil {
97 98
		var isPaused bool
		loader, traverser, isPaused, err = qe.prepareQuery(taskData.ctx, key.p, taskData.request)
99 100 101 102 103 104 105 106
		if err != nil {
			return graphsync.RequestFailedUnknown, err
		}
		select {
		case <-qe.ctx.Done():
			return graphsync.RequestFailedUnknown, errors.New("context cancelled")
		case qe.messages <- &setResponseDataRequest{key, loader, traverser}:
		}
107 108 109
		if isPaused {
			return graphsync.RequestPaused, hooks.ErrPaused{}
		}
110
	}
111
	return qe.executeQuery(key.p, taskData.request, loader, traverser, taskData.signals)
112 113 114 115
}

func (qe *queryExecutor) prepareQuery(ctx context.Context,
	p peer.ID,
116
	request gsmsg.GraphSyncRequest) (ipld.Loader, ipldutil.Traverser, bool, error) {
117 118
	result := qe.requestHooks.ProcessRequestHooks(p, request)
	peerResponseSender := qe.peerManager.SenderForPeer(p)
119 120
	var transactionError error
	var isPaused bool
121 122 123 124 125 126
	err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
		for _, extension := range result.Extensions {
			transaction.SendExtensionData(extension)
		}
		if result.Err != nil || !result.IsValidated {
			transaction.FinishWithError(graphsync.RequestFailedUnknown)
127 128 129 130
			transactionError = errors.New("request not valid")
		} else if result.IsPaused {
			transaction.PauseRequest()
			isPaused = true
131 132 133 134
		}
		return nil
	})
	if err != nil {
135
		return nil, nil, false, err
136
	}
137 138
	if transactionError != nil {
		return nil, nil, false, transactionError
139 140
	}
	if err := qe.processDoNoSendCids(request, peerResponseSender); err != nil {
141
		return nil, nil, false, err
142 143 144 145 146 147 148 149 150 151 152
	}
	rootLink := cidlink.Link{Cid: request.Root()}
	traverser := ipldutil.TraversalBuilder{
		Root:     rootLink,
		Selector: request.Selector(),
		Chooser:  result.CustomChooser,
	}.Start(ctx)
	loader := result.CustomLoader
	if loader == nil {
		loader = qe.loader
	}
153
	return loader, traverser, isPaused, nil
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
}

func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, peerResponseSender peerresponsemanager.PeerResponseSender) error {
	doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
	if !has {
		return nil
	}
	cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
	if err != nil {
		peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
		return err
	}
	links := make([]ipld.Link, 0, cidSet.Len())
	err = cidSet.ForEach(func(c cid.Cid) error {
		links = append(links, cidlink.Link{Cid: c})
		return nil
	})
	if err != nil {
		return err
	}
	peerResponseSender.IgnoreBlocks(request.ID(), links)
	return nil
}

func (qe *queryExecutor) executeQuery(
	p peer.ID,
	request gsmsg.GraphSyncRequest,
	loader ipld.Loader,
	traverser ipldutil.Traverser,
183
	signals signals) (graphsync.ResponseStatusCode, error) {
184 185 186 187 188
	updateChan := make(chan []gsmsg.GraphSyncRequest)
	peerResponseSender := qe.peerManager.SenderForPeer(p)
	err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
		var err error
		_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
189 190
			err = qe.checkForUpdates(p, request, signals, updateChan, transaction)
			if _, ok := err.(hooks.ErrPaused); !ok && err != nil {
191 192 193 194 195 196 197 198
				return nil
			}
			blockData := transaction.SendResponse(link, data)
			if blockData.BlockSize() > 0 {
				result := qe.blockHooks.ProcessBlockHooks(p, request, blockData)
				for _, extension := range result.Extensions {
					transaction.SendExtensionData(extension)
				}
199
				if _, ok := result.Err.(hooks.ErrPaused); ok {
200 201
					transaction.PauseRequest()
				}
202 203 204
				if result.Err != nil {
					err = result.Err
				}
205 206 207 208 209 210
			}
			return nil
		})
		return err
	})
	if err != nil {
211 212 213
		_, isPaused := err.(hooks.ErrPaused)
		if isPaused {
			return graphsync.RequestPaused, err
214
		}
215 216 217 218 219 220 221 222 223 224
		if isContextErr(err) {
			peerResponseSender.FinishWithCancel(request.ID())
			return graphsync.RequestCancelled, err
		}
		if err == errCancelledByCommand {
			peerResponseSender.FinishWithError(request.ID(), graphsync.RequestCancelled)
			return graphsync.RequestCancelled, err
		}
		peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
		return graphsync.RequestFailedUnknown, err
225 226 227 228 229 230 231
	}
	return peerResponseSender.FinishRequest(request.ID()), nil
}

func (qe *queryExecutor) checkForUpdates(
	p peer.ID,
	request gsmsg.GraphSyncRequest,
232
	signals signals,
233 234 235 236
	updateChan chan []gsmsg.GraphSyncRequest,
	peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error {
	for {
		select {
237 238 239 240 241 242 243 244 245
		case selfCancelled := <-signals.stopSignal:
			if selfCancelled {
				return errCancelledByCommand
			}
			return ipldutil.ContextCancelError{}
		case <-signals.pauseSignal:
			peerResponseSender.PauseRequest()
			return hooks.ErrPaused{}
		case <-signals.updateSignal:
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
			select {
			case qe.messages <- &responseUpdateRequest{responseKey{p, request.ID()}, updateChan}:
			case <-qe.ctx.Done():
			}
			select {
			case updates := <-updateChan:
				for _, update := range updates {
					result := qe.updateHooks.ProcessUpdateHooks(p, request, update)
					for _, extension := range result.Extensions {
						peerResponseSender.SendExtensionData(extension)
					}
					if result.Err != nil {
						return result.Err
					}
				}
			case <-qe.ctx.Done():
			}
		default:
			return nil
		}
	}
}
268 269 270 271 272

func isContextErr(err error) bool {
	// TODO: Match with errors.Is when https://github.com/ipld/go-ipld-prime/issues/58 is resolved
	return strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error())
}