peerresponsesender.go 12.1 KB
Newer Older
1 2 3 4 5 6
package peerresponsemanager

import (
	"context"
	"sync"

Hannah Howard's avatar
Hannah Howard committed
7
	blocks "github.com/ipfs/go-block-format"
8 9
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
Hannah Howard's avatar
Hannah Howard committed
10 11
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
	"github.com/libp2p/go-libp2p-core/peer"
12

Hannah Howard's avatar
Hannah Howard committed
13
	"github.com/ipfs/go-graphsync"
14
	"github.com/ipfs/go-graphsync/linktracker"
15
	gsmsg "github.com/ipfs/go-graphsync/message"
Hannah Howard's avatar
Hannah Howard committed
16
	"github.com/ipfs/go-graphsync/peermanager"
17 18 19
	"github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
)

20 21
const (
	// max block size is the maximum size for batching blocks in a single payload
22
	maxBlockSize uint64 = 512 * 1024
23 24
)

25 26 27 28 29 30 31 32
var log = logging.Logger("graphsync")

// PeerMessageHandler is an interface that can send a response for a given peer across
// the network.
type PeerMessageHandler interface {
	SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block) <-chan struct{}
}

Hannah Howard's avatar
Hannah Howard committed
33 34 35
// Transaction is a series of operations that should be send together in a single response
type Transaction func(PeerResponseTransactionSender) error

36 37 38 39 40 41 42
type peerResponseSender struct {
	p            peer.ID
	ctx          context.Context
	cancel       context.CancelFunc
	peerHandler  PeerMessageHandler
	outgoingWork chan struct{}

43 44
	linkTrackerLk      sync.RWMutex
	linkTracker        *linktracker.LinkTracker
Hannah Howard's avatar
Hannah Howard committed
45 46
	altTrackers        map[string]*linktracker.LinkTracker
	dedupKeys          map[graphsync.RequestID]string
47 48
	responseBuildersLk sync.RWMutex
	responseBuilders   []*responsebuilder.ResponseBuilder
49 50 51 52 53 54
}

// PeerResponseSender handles batching, deduping, and sending responses for
// a given peer across multiple requests.
type PeerResponseSender interface {
	peermanager.PeerProcess
Hannah Howard's avatar
Hannah Howard committed
55
	DedupKey(requestID graphsync.RequestID, key string)
56
	IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link)
57
	SendResponse(
58
		requestID graphsync.RequestID,
59 60
		link ipld.Link,
		data []byte,
61
	) graphsync.BlockData
62
	SendExtensionData(graphsync.RequestID, graphsync.ExtensionData)
63
	FinishWithCancel(requestID graphsync.RequestID)
64
	FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode
65
	FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode)
Hannah Howard's avatar
Hannah Howard committed
66 67 68
	// Transaction calls multiple operations at once so they end up in a single response
	// Note: if the transaction function errors, the results will not execute
	Transaction(requestID graphsync.RequestID, transaction Transaction) error
Hannah Howard's avatar
Hannah Howard committed
69
	PauseRequest(requestID graphsync.RequestID)
70 71
}

Hannah Howard's avatar
Hannah Howard committed
72 73 74 75 76 77 78
// PeerResponseTransactionSender is a limited interface for sending responses inside a transaction
type PeerResponseTransactionSender interface {
	SendResponse(
		link ipld.Link,
		data []byte,
	) graphsync.BlockData
	SendExtensionData(graphsync.ExtensionData)
79
	FinishWithCancel()
Hannah Howard's avatar
Hannah Howard committed
80 81 82 83 84
	FinishRequest() graphsync.ResponseStatusCode
	FinishWithError(status graphsync.ResponseStatusCode)
	PauseRequest()
}

85
// NewResponseSender generates a new PeerResponseSender for the given context, peer ID,
86
// using the given peer message handler.
87
func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler) PeerResponseSender {
88 89 90 91 92 93 94 95
	ctx, cancel := context.WithCancel(ctx)
	return &peerResponseSender{
		p:            p,
		ctx:          ctx,
		cancel:       cancel,
		peerHandler:  peerHandler,
		outgoingWork: make(chan struct{}, 1),
		linkTracker:  linktracker.New(),
Hannah Howard's avatar
Hannah Howard committed
96 97
		dedupKeys:    make(map[graphsync.RequestID]string),
		altTrackers:  make(map[string]*linktracker.LinkTracker),
98 99 100 101
	}
}

// Startup initiates message sending for a peer
Hannah Howard's avatar
Hannah Howard committed
102 103
func (prs *peerResponseSender) Startup() {
	go prs.run()
104 105
}

Hannah Howard's avatar
Hannah Howard committed
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
func (prs *peerResponseSender) getLinkTracker(requestID graphsync.RequestID) *linktracker.LinkTracker {
	key, ok := prs.dedupKeys[requestID]
	if ok {
		return prs.altTrackers[key]
	}
	return prs.linkTracker
}

func (prs *peerResponseSender) DedupKey(requestID graphsync.RequestID, key string) {
	prs.linkTrackerLk.Lock()
	defer prs.linkTrackerLk.Unlock()
	prs.dedupKeys[requestID] = key
	_, ok := prs.altTrackers[key]
	if !ok {
		prs.altTrackers[key] = linktracker.New()
	}
}

124 125
func (prs *peerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
	prs.linkTrackerLk.Lock()
Hannah Howard's avatar
Hannah Howard committed
126
	linkTracker := prs.getLinkTracker(requestID)
127
	for _, link := range links {
Hannah Howard's avatar
Hannah Howard committed
128
		linkTracker.RecordLinkTraversal(requestID, link, true)
129 130 131 132
	}
	prs.linkTrackerLk.Unlock()
}

Hannah Howard's avatar
Hannah Howard committed
133 134 135
type responseOperation interface {
	build(responseBuilder *responsebuilder.ResponseBuilder)
	size() uint64
136 137
}

Hannah Howard's avatar
Hannah Howard committed
138 139 140 141 142 143 144 145 146
func (prs *peerResponseSender) execute(operations []responseOperation) {
	size := uint64(0)
	for _, op := range operations {
		size += op.size()
	}
	if prs.buildResponse(size, func(responseBuilder *responsebuilder.ResponseBuilder) {
		for _, op := range operations {
			op.build(responseBuilder)
		}
147
	}) {
Hannah Howard's avatar
Hannah Howard committed
148
		prs.signalWork()
149 150 151
	}
}

Hannah Howard's avatar
Hannah Howard committed
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
// Shutdown stops sending messages for a peer
func (prs *peerResponseSender) Shutdown() {
	prs.cancel()
}

type extensionOperation struct {
	requestID graphsync.RequestID
	extension graphsync.ExtensionData
}

func (eo extensionOperation) build(responseBuilder *responsebuilder.ResponseBuilder) {
	responseBuilder.AddExtensionData(eo.requestID, eo.extension)
}

func (eo extensionOperation) size() uint64 {
	return uint64(len(eo.extension.Data))
}

func (prs *peerResponseSender) SendExtensionData(requestID graphsync.RequestID, extension graphsync.ExtensionData) {
	prs.execute([]responseOperation{extensionOperation{requestID, extension}})
}

type peerResponseTransactionSender struct {
	requestID  graphsync.RequestID
	operations []responseOperation
	prs        *peerResponseSender
}

func (prts *peerResponseTransactionSender) SendResponse(link ipld.Link, data []byte) graphsync.BlockData {
	op := prts.prs.setupBlockOperation(prts.requestID, link, data)
	prts.operations = append(prts.operations, op)
	return op
}

func (prts *peerResponseTransactionSender) SendExtensionData(extension graphsync.ExtensionData) {
	prts.operations = append(prts.operations, extensionOperation{prts.requestID, extension})
}

func (prts *peerResponseTransactionSender) FinishRequest() graphsync.ResponseStatusCode {
	op := prts.prs.setupFinishOperation(prts.requestID)
	prts.operations = append(prts.operations, op)
	return op.status
}

func (prts *peerResponseTransactionSender) FinishWithError(status graphsync.ResponseStatusCode) {
	prts.operations = append(prts.operations, prts.prs.setupFinishWithErrOperation(prts.requestID, status))
}

func (prts *peerResponseTransactionSender) PauseRequest() {
	prts.operations = append(prts.operations, statusOperation{prts.requestID, graphsync.RequestPaused})
}

204 205 206 207
func (prts *peerResponseTransactionSender) FinishWithCancel() {
	_ = prts.prs.finishTracking(prts.requestID)
}

Hannah Howard's avatar
Hannah Howard committed
208 209 210 211 212 213 214 215 216 217 218 219 220 221
func (prs *peerResponseSender) Transaction(requestID graphsync.RequestID, transaction Transaction) error {
	prts := &peerResponseTransactionSender{
		requestID: requestID,
		prs:       prs,
	}
	err := transaction(prts)
	if err == nil {
		prs.execute(prts.operations)
	}
	return err
}

type blockOperation struct {
	data      []byte
222
	sendBlock bool
Hannah Howard's avatar
Hannah Howard committed
223 224
	link      ipld.Link
	requestID graphsync.RequestID
225 226
}

Hannah Howard's avatar
Hannah Howard committed
227 228 229 230 231 232 233 234 235 236
func (bo blockOperation) build(responseBuilder *responsebuilder.ResponseBuilder) {
	if bo.sendBlock {
		cidLink := bo.link.(cidlink.Link)
		block, err := blocks.NewBlockWithCid(bo.data, cidLink.Cid)
		if err != nil {
			log.Errorf("Data did not match cid when sending link for %s", cidLink.String())
		}
		responseBuilder.AddBlock(block)
	}
	responseBuilder.AddLink(bo.requestID, bo.link, bo.data != nil)
237 238
}

Hannah Howard's avatar
Hannah Howard committed
239 240
func (bo blockOperation) Link() ipld.Link {
	return bo.link
241 242
}

Hannah Howard's avatar
Hannah Howard committed
243 244 245 246 247 248
func (bo blockOperation) BlockSize() uint64 {
	return uint64(len(bo.data))
}

func (bo blockOperation) BlockSizeOnWire() uint64 {
	if !bo.sendBlock {
249 250
		return 0
	}
Hannah Howard's avatar
Hannah Howard committed
251 252 253 254 255 256 257 258 259 260 261
	return bo.BlockSize()
}

func (bo blockOperation) size() uint64 {
	return bo.BlockSizeOnWire()
}

func (prs *peerResponseSender) setupBlockOperation(requestID graphsync.RequestID,
	link ipld.Link, data []byte) blockOperation {
	hasBlock := data != nil
	prs.linkTrackerLk.Lock()
Hannah Howard's avatar
Hannah Howard committed
262 263 264
	linkTracker := prs.getLinkTracker(requestID)
	sendBlock := hasBlock && linkTracker.BlockRefCount(link) == 0
	linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
Hannah Howard's avatar
Hannah Howard committed
265 266 267 268
	prs.linkTrackerLk.Unlock()
	return blockOperation{
		data, sendBlock, link, requestID,
	}
269 270
}

271 272 273
// SendResponse sends a given link for a given
// requestID across the wire, as well as its corresponding
// block if the block is present and has not already been sent
274
// it returns the number of block bytes sent
Hannah Howard's avatar
Hannah Howard committed
275
func (prs *peerResponseSender) SendResponse(
276
	requestID graphsync.RequestID,
277 278
	link ipld.Link,
	data []byte,
279
) graphsync.BlockData {
Hannah Howard's avatar
Hannah Howard committed
280 281 282
	op := prs.setupBlockOperation(requestID, link, data)
	prs.execute([]responseOperation{op})
	return op
283 284
}

Hannah Howard's avatar
Hannah Howard committed
285 286 287 288 289 290 291 292 293 294 295 296 297
type statusOperation struct {
	requestID graphsync.RequestID
	status    graphsync.ResponseStatusCode
}

func (fo statusOperation) build(responseBuilder *responsebuilder.ResponseBuilder) {
	responseBuilder.AddResponseCode(fo.requestID, fo.status)
}

func (fo statusOperation) size() uint64 {
	return 0
}

298
func (prs *peerResponseSender) finishTracking(requestID graphsync.RequestID) bool {
Hannah Howard's avatar
Hannah Howard committed
299
	prs.linkTrackerLk.Lock()
300
	defer prs.linkTrackerLk.Unlock()
Hannah Howard's avatar
Hannah Howard committed
301 302 303 304 305 306 307 308 309 310
	linkTracker := prs.getLinkTracker(requestID)
	allBlocks := linkTracker.FinishRequest(requestID)
	key, ok := prs.dedupKeys[requestID]
	if ok {
		delete(prs.dedupKeys, requestID)
		if linkTracker.Empty() {
			delete(prs.altTrackers, key)
		}
	}
	return allBlocks
311 312 313 314
}

func (prs *peerResponseSender) setupFinishOperation(requestID graphsync.RequestID) statusOperation {
	isComplete := prs.finishTracking(requestID)
315
	var status graphsync.ResponseStatusCode
316
	if isComplete {
317
		status = graphsync.RequestCompletedFull
318
	} else {
319
		status = graphsync.RequestCompletedPartial
320
	}
Hannah Howard's avatar
Hannah Howard committed
321
	return statusOperation{requestID, status}
322 323
}

Hannah Howard's avatar
Hannah Howard committed
324 325 326 327 328 329
// FinishRequest marks the given requestID as having sent all responses
func (prs *peerResponseSender) FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode {
	op := prs.setupFinishOperation(requestID)
	prs.execute([]responseOperation{op})
	return op.status
}
330

Hannah Howard's avatar
Hannah Howard committed
331
func (prs *peerResponseSender) setupFinishWithErrOperation(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) statusOperation {
Hannah Howard's avatar
Hannah Howard committed
332
	prs.finishTracking(requestID)
Hannah Howard's avatar
Hannah Howard committed
333
	return statusOperation{requestID, status}
334
}
335

Hannah Howard's avatar
Hannah Howard committed
336 337 338 339
// FinishWithError marks the given requestID as having terminated with an error
func (prs *peerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
	op := prs.setupFinishWithErrOperation(requestID, status)
	prs.execute([]responseOperation{op})
Hannah Howard's avatar
Hannah Howard committed
340 341
}

Hannah Howard's avatar
Hannah Howard committed
342 343
func (prs *peerResponseSender) PauseRequest(requestID graphsync.RequestID) {
	prs.execute([]responseOperation{statusOperation{requestID, graphsync.RequestPaused}})
344
}
Hannah Howard's avatar
Hannah Howard committed
345

346 347 348 349
func (prs *peerResponseSender) FinishWithCancel(requestID graphsync.RequestID) {
	_ = prs.finishTracking(requestID)
}

Hannah Howard's avatar
Hannah Howard committed
350 351 352 353 354
func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder)) bool {
	prs.responseBuildersLk.Lock()
	defer prs.responseBuildersLk.Unlock()
	if shouldBeginNewResponse(prs.responseBuilders, blkSize) {
		prs.responseBuilders = append(prs.responseBuilders, responsebuilder.New())
355
	}
Hannah Howard's avatar
Hannah Howard committed
356
	responseBuilder := prs.responseBuilders[len(prs.responseBuilders)-1]
357 358 359 360
	buildResponseFn(responseBuilder)
	return !responseBuilder.Empty()
}

361
func shouldBeginNewResponse(responseBuilders []*responsebuilder.ResponseBuilder, blkSize uint64) bool {
362 363
	if len(responseBuilders) == 0 {
		return true
364
	}
365 366 367 368
	if blkSize == 0 {
		return false
	}
	return responseBuilders[len(responseBuilders)-1].BlockSize()+blkSize > maxBlockSize
369 370
}

Hannah Howard's avatar
Hannah Howard committed
371
func (prs *peerResponseSender) signalWork() {
372
	select {
Hannah Howard's avatar
Hannah Howard committed
373
	case prs.outgoingWork <- struct{}{}:
374 375 376 377
	default:
	}
}

Hannah Howard's avatar
Hannah Howard committed
378
func (prs *peerResponseSender) run() {
379 380
	for {
		select {
Hannah Howard's avatar
Hannah Howard committed
381
		case <-prs.ctx.Done():
382
			return
Hannah Howard's avatar
Hannah Howard committed
383 384
		case <-prs.outgoingWork:
			prs.sendResponseMessages()
385 386 387 388
		}
	}
}

Hannah Howard's avatar
Hannah Howard committed
389 390 391 392 393
func (prs *peerResponseSender) sendResponseMessages() {
	prs.responseBuildersLk.Lock()
	builders := prs.responseBuilders
	prs.responseBuilders = nil
	prs.responseBuildersLk.Unlock()
394

395 396 397 398
	for _, builder := range builders {
		if builder.Empty() {
			continue
		}
399
		responses, blks, err := builder.Build()
400 401 402
		if err != nil {
			log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
		}
403

Hannah Howard's avatar
Hannah Howard committed
404
		done := prs.peerHandler.SendResponse(prs.p, responses, blks)
405

406 407 408
		// wait for message to be processed
		select {
		case <-done:
Hannah Howard's avatar
Hannah Howard committed
409
		case <-prs.ctx.Done():
410
		}
411
	}
412

413
}