queryexecutor.go 8.43 KB
Newer Older
1 2 3 4 5
package responsemanager

import (
	"context"
	"errors"
6
	"strings"
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
	"time"

	"github.com/ipfs/go-cid"
	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/cidset"
	"github.com/ipfs/go-graphsync/ipldutil"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/responsemanager/hooks"
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
	"github.com/ipfs/go-graphsync/responsemanager/runtraversal"
	ipld "github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
	"github.com/libp2p/go-libp2p-core/peer"
)

22 23
var errCancelledByCommand = errors.New("response cancelled by responder")

24 25
// TODO: Move this into a seperate module and fully seperate from the ResponseManager
type queryExecutor struct {
26 27 28
	requestHooks       RequestHooks
	blockHooks         BlockHooks
	updateHooks        UpdateHooks
29
	completedHooks     CompletedHooks
30 31 32 33 34 35 36 37
	cancelledListeners CancelledListeners
	peerManager        PeerManager
	loader             ipld.Loader
	queryQueue         QueryQueue
	messages           chan responseManagerMessage
	ctx                context.Context
	workSignal         chan struct{}
	ticker             *time.Ticker
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
}

func (qe *queryExecutor) processQueriesWorker() {
	const targetWork = 1
	taskDataChan := make(chan responseTaskData)
	var taskData responseTaskData
	for {
		pid, tasks, _ := qe.queryQueue.PopTasks(targetWork)
		for len(tasks) == 0 {
			select {
			case <-qe.ctx.Done():
				return
			case <-qe.workSignal:
				pid, tasks, _ = qe.queryQueue.PopTasks(targetWork)
			case <-qe.ticker.C:
				qe.queryQueue.ThawRound()
				pid, tasks, _ = qe.queryQueue.PopTasks(targetWork)
			}
		}
		for _, task := range tasks {
			key := task.Topic.(responseKey)
			select {
			case qe.messages <- &responseDataRequest{key, taskDataChan}:
			case <-qe.ctx.Done():
				return
			}
			select {
			case taskData = <-taskDataChan:
			case <-qe.ctx.Done():
				return
			}
			if taskData.empty {
				log.Info("Empty task on peer request stack")
				continue
			}
			status, err := qe.executeTask(key, taskData)
			select {
			case qe.messages <- &finishTaskRequest{key, status, err}:
			case <-qe.ctx.Done():
			}
		}
		qe.queryQueue.TasksDone(pid, tasks...)

	}

}

func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) (graphsync.ResponseStatusCode, error) {
	var err error
	loader := taskData.loader
	traverser := taskData.traverser
	if loader == nil || traverser == nil {
90 91
		var isPaused bool
		loader, traverser, isPaused, err = qe.prepareQuery(taskData.ctx, key.p, taskData.request)
92 93 94 95 96 97 98 99
		if err != nil {
			return graphsync.RequestFailedUnknown, err
		}
		select {
		case <-qe.ctx.Done():
			return graphsync.RequestFailedUnknown, errors.New("context cancelled")
		case qe.messages <- &setResponseDataRequest{key, loader, traverser}:
		}
100 101 102
		if isPaused {
			return graphsync.RequestPaused, hooks.ErrPaused{}
		}
103
	}
104
	return qe.executeQuery(key.p, taskData.request, loader, traverser, taskData.signals)
105 106 107 108
}

func (qe *queryExecutor) prepareQuery(ctx context.Context,
	p peer.ID,
109
	request gsmsg.GraphSyncRequest) (ipld.Loader, ipldutil.Traverser, bool, error) {
110 111
	result := qe.requestHooks.ProcessRequestHooks(p, request)
	peerResponseSender := qe.peerManager.SenderForPeer(p)
112 113
	var transactionError error
	var isPaused bool
114 115 116 117 118 119
	err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
		for _, extension := range result.Extensions {
			transaction.SendExtensionData(extension)
		}
		if result.Err != nil || !result.IsValidated {
			transaction.FinishWithError(graphsync.RequestFailedUnknown)
120 121 122 123
			transactionError = errors.New("request not valid")
		} else if result.IsPaused {
			transaction.PauseRequest()
			isPaused = true
124 125 126 127
		}
		return nil
	})
	if err != nil {
128
		return nil, nil, false, err
129
	}
130 131
	if transactionError != nil {
		return nil, nil, false, transactionError
132 133
	}
	if err := qe.processDoNoSendCids(request, peerResponseSender); err != nil {
134
		return nil, nil, false, err
135 136 137 138 139 140 141 142 143 144 145
	}
	rootLink := cidlink.Link{Cid: request.Root()}
	traverser := ipldutil.TraversalBuilder{
		Root:     rootLink,
		Selector: request.Selector(),
		Chooser:  result.CustomChooser,
	}.Start(ctx)
	loader := result.CustomLoader
	if loader == nil {
		loader = qe.loader
	}
146
	return loader, traverser, isPaused, nil
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
}

func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, peerResponseSender peerresponsemanager.PeerResponseSender) error {
	doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
	if !has {
		return nil
	}
	cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
	if err != nil {
		peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
		return err
	}
	links := make([]ipld.Link, 0, cidSet.Len())
	err = cidSet.ForEach(func(c cid.Cid) error {
		links = append(links, cidlink.Link{Cid: c})
		return nil
	})
	if err != nil {
		return err
	}
	peerResponseSender.IgnoreBlocks(request.ID(), links)
	return nil
}

func (qe *queryExecutor) executeQuery(
	p peer.ID,
	request gsmsg.GraphSyncRequest,
	loader ipld.Loader,
	traverser ipldutil.Traverser,
176
	signals signals) (graphsync.ResponseStatusCode, error) {
177 178 179 180 181
	updateChan := make(chan []gsmsg.GraphSyncRequest)
	peerResponseSender := qe.peerManager.SenderForPeer(p)
	err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
		var err error
		_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
182 183
			err = qe.checkForUpdates(p, request, signals, updateChan, transaction)
			if _, ok := err.(hooks.ErrPaused); !ok && err != nil {
184 185 186 187 188 189 190 191
				return nil
			}
			blockData := transaction.SendResponse(link, data)
			if blockData.BlockSize() > 0 {
				result := qe.blockHooks.ProcessBlockHooks(p, request, blockData)
				for _, extension := range result.Extensions {
					transaction.SendExtensionData(extension)
				}
192
				if _, ok := result.Err.(hooks.ErrPaused); ok {
193 194
					transaction.PauseRequest()
				}
195 196 197
				if result.Err != nil {
					err = result.Err
				}
198 199 200 201 202
			}
			return nil
		})
		return err
	})
203 204 205 206

	var status graphsync.ResponseStatusCode
	_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
		status = qe.closeRequest(transaction, err)
207
		if isContextErr(err) {
208 209 210 211 212 213
			qe.cancelledListeners.NotifyCancelledListeners(p, request)
		} else if status != graphsync.RequestPaused {
			result := qe.completedHooks.ProcessCompleteHooks(p, request, status)
			for _, extension := range result.Extensions {
				transaction.SendExtensionData(extension)
			}
214
		}
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
		return nil
	})
	return status, err
}

func (qe *queryExecutor) closeRequest(peerResponseSender peerresponsemanager.PeerResponseTransactionSender, err error) graphsync.ResponseStatusCode {
	_, isPaused := err.(hooks.ErrPaused)
	if isPaused {
		return graphsync.RequestPaused
	}
	if isContextErr(err) {
		peerResponseSender.FinishWithCancel()
		return graphsync.RequestCancelled
	}
	if err == errCancelledByCommand {
		peerResponseSender.FinishWithError(graphsync.RequestCancelled)
		return graphsync.RequestCancelled
	}
	if err != nil {
		peerResponseSender.FinishWithError(graphsync.RequestFailedUnknown)
		return graphsync.RequestFailedUnknown
236
	}
237
	return peerResponseSender.FinishRequest()
238 239 240 241 242
}

func (qe *queryExecutor) checkForUpdates(
	p peer.ID,
	request gsmsg.GraphSyncRequest,
243
	signals signals,
244 245 246 247
	updateChan chan []gsmsg.GraphSyncRequest,
	peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error {
	for {
		select {
248 249 250 251 252 253 254 255 256
		case selfCancelled := <-signals.stopSignal:
			if selfCancelled {
				return errCancelledByCommand
			}
			return ipldutil.ContextCancelError{}
		case <-signals.pauseSignal:
			peerResponseSender.PauseRequest()
			return hooks.ErrPaused{}
		case <-signals.updateSignal:
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
			select {
			case qe.messages <- &responseUpdateRequest{responseKey{p, request.ID()}, updateChan}:
			case <-qe.ctx.Done():
			}
			select {
			case updates := <-updateChan:
				for _, update := range updates {
					result := qe.updateHooks.ProcessUpdateHooks(p, request, update)
					for _, extension := range result.Extensions {
						peerResponseSender.SendExtensionData(extension)
					}
					if result.Err != nil {
						return result.Err
					}
				}
			case <-qe.ctx.Done():
			}
		default:
			return nil
		}
	}
}
279 280 281

func isContextErr(err error) bool {
	// TODO: Match with errors.Is when https://github.com/ipld/go-ipld-prime/issues/58 is resolved
282
	return err != nil && strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error())
283
}