peerresponsesender.go 14.9 KB
Newer Older
1 2 3 4
package peerresponsemanager

import (
	"context"
5
	"fmt"
6 7
	"sync"

Hannah Howard's avatar
Hannah Howard committed
8
	blocks "github.com/ipfs/go-block-format"
9 10
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
Hannah Howard's avatar
Hannah Howard committed
11 12
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
	"github.com/libp2p/go-libp2p-core/peer"
13

Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/ipfs/go-graphsync"
15
	"github.com/ipfs/go-graphsync/linktracker"
16
	gsmsg "github.com/ipfs/go-graphsync/message"
17 18
	"github.com/ipfs/go-graphsync/messagequeue"
	"github.com/ipfs/go-graphsync/notifications"
Hannah Howard's avatar
Hannah Howard committed
19
	"github.com/ipfs/go-graphsync/peermanager"
20 21 22
	"github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
)

23 24
const (
	// max block size is the maximum size for batching blocks in a single payload
25
	maxBlockSize uint64 = 512 * 1024
26 27
)

28 29
var log = logging.Logger("graphsync")

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
// EventName is a type of event that is published by the peer response sender
type EventName uint64

const (
	// Sent indicates the item was sent over the wire
	Sent EventName = iota
	// Error indicates an error sending an item
	Error
)

// Event is an event that is published by the peer response sender
type Event struct {
	Name EventName
	Err  error
}

46 47 48
// PeerMessageHandler is an interface that can send a response for a given peer across
// the network.
type PeerMessageHandler interface {
49
	SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block, ...notifications.Notifee)
50 51
}

Hannah Howard's avatar
Hannah Howard committed
52 53 54
// Transaction is a series of operations that should be send together in a single response
type Transaction func(PeerResponseTransactionSender) error

55 56 57 58 59 60 61
type peerResponseSender struct {
	p            peer.ID
	ctx          context.Context
	cancel       context.CancelFunc
	peerHandler  PeerMessageHandler
	outgoingWork chan struct{}

62 63
	linkTrackerLk      sync.RWMutex
	linkTracker        *linktracker.LinkTracker
Hannah Howard's avatar
Hannah Howard committed
64 65
	altTrackers        map[string]*linktracker.LinkTracker
	dedupKeys          map[graphsync.RequestID]string
66 67
	responseBuildersLk sync.RWMutex
	responseBuilders   []*responsebuilder.ResponseBuilder
68 69 70 71
	nextBuilderTopic   responsebuilder.Topic
	queuedMessages     chan responsebuilder.Topic
	subscriber         notifications.MappableSubscriber
	publisher          notifications.Publisher
72 73 74 75 76 77
}

// PeerResponseSender handles batching, deduping, and sending responses for
// a given peer across multiple requests.
type PeerResponseSender interface {
	peermanager.PeerProcess
Hannah Howard's avatar
Hannah Howard committed
78
	DedupKey(requestID graphsync.RequestID, key string)
79
	IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link)
80
	SendResponse(
81
		requestID graphsync.RequestID,
82 83
		link ipld.Link,
		data []byte,
84
		notifees ...notifications.Notifee,
85
	) graphsync.BlockData
86
	SendExtensionData(graphsync.RequestID, graphsync.ExtensionData, ...notifications.Notifee)
87
	FinishWithCancel(requestID graphsync.RequestID)
88 89
	FinishRequest(requestID graphsync.RequestID, notifees ...notifications.Notifee) graphsync.ResponseStatusCode
	FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode, notifees ...notifications.Notifee)
Hannah Howard's avatar
Hannah Howard committed
90 91 92
	// Transaction calls multiple operations at once so they end up in a single response
	// Note: if the transaction function errors, the results will not execute
	Transaction(requestID graphsync.RequestID, transaction Transaction) error
93
	PauseRequest(requestID graphsync.RequestID, notifees ...notifications.Notifee)
94 95
}

Hannah Howard's avatar
Hannah Howard committed
96 97 98 99 100 101 102
// PeerResponseTransactionSender is a limited interface for sending responses inside a transaction
type PeerResponseTransactionSender interface {
	SendResponse(
		link ipld.Link,
		data []byte,
	) graphsync.BlockData
	SendExtensionData(graphsync.ExtensionData)
103
	FinishWithCancel()
Hannah Howard's avatar
Hannah Howard committed
104 105 106
	FinishRequest() graphsync.ResponseStatusCode
	FinishWithError(status graphsync.ResponseStatusCode)
	PauseRequest()
107
	AddNotifee(notifications.Notifee)
Hannah Howard's avatar
Hannah Howard committed
108 109
}

110
// NewResponseSender generates a new PeerResponseSender for the given context, peer ID,
111
// using the given peer message handler.
112
func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler) PeerResponseSender {
113
	ctx, cancel := context.WithCancel(ctx)
114 115 116 117 118 119 120 121 122 123 124
	prs := &peerResponseSender{
		p:              p,
		ctx:            ctx,
		cancel:         cancel,
		peerHandler:    peerHandler,
		outgoingWork:   make(chan struct{}, 1),
		linkTracker:    linktracker.New(),
		dedupKeys:      make(map[graphsync.RequestID]string),
		altTrackers:    make(map[string]*linktracker.LinkTracker),
		queuedMessages: make(chan responsebuilder.Topic, 1),
		publisher:      notifications.NewPublisher(),
125
	}
126 127
	prs.subscriber = notifications.NewMappableSubscriber(&subscriber{prs}, notifications.IdentityTransform)
	return prs
128 129 130
}

// Startup initiates message sending for a peer
Hannah Howard's avatar
Hannah Howard committed
131 132
func (prs *peerResponseSender) Startup() {
	go prs.run()
133 134
}

Hannah Howard's avatar
Hannah Howard committed
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
func (prs *peerResponseSender) getLinkTracker(requestID graphsync.RequestID) *linktracker.LinkTracker {
	key, ok := prs.dedupKeys[requestID]
	if ok {
		return prs.altTrackers[key]
	}
	return prs.linkTracker
}

func (prs *peerResponseSender) DedupKey(requestID graphsync.RequestID, key string) {
	prs.linkTrackerLk.Lock()
	defer prs.linkTrackerLk.Unlock()
	prs.dedupKeys[requestID] = key
	_, ok := prs.altTrackers[key]
	if !ok {
		prs.altTrackers[key] = linktracker.New()
	}
}

153 154
func (prs *peerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
	prs.linkTrackerLk.Lock()
Hannah Howard's avatar
Hannah Howard committed
155
	linkTracker := prs.getLinkTracker(requestID)
156
	for _, link := range links {
Hannah Howard's avatar
Hannah Howard committed
157
		linkTracker.RecordLinkTraversal(requestID, link, true)
158 159 160 161
	}
	prs.linkTrackerLk.Unlock()
}

Hannah Howard's avatar
Hannah Howard committed
162 163 164
type responseOperation interface {
	build(responseBuilder *responsebuilder.ResponseBuilder)
	size() uint64
165 166
}

167
func (prs *peerResponseSender) execute(operations []responseOperation, notifees []notifications.Notifee) {
Hannah Howard's avatar
Hannah Howard committed
168 169 170 171 172 173 174 175
	size := uint64(0)
	for _, op := range operations {
		size += op.size()
	}
	if prs.buildResponse(size, func(responseBuilder *responsebuilder.ResponseBuilder) {
		for _, op := range operations {
			op.build(responseBuilder)
		}
176
	}, notifees) {
Hannah Howard's avatar
Hannah Howard committed
177
		prs.signalWork()
178 179 180
	}
}

Hannah Howard's avatar
Hannah Howard committed
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
// Shutdown stops sending messages for a peer
func (prs *peerResponseSender) Shutdown() {
	prs.cancel()
}

type extensionOperation struct {
	requestID graphsync.RequestID
	extension graphsync.ExtensionData
}

func (eo extensionOperation) build(responseBuilder *responsebuilder.ResponseBuilder) {
	responseBuilder.AddExtensionData(eo.requestID, eo.extension)
}

func (eo extensionOperation) size() uint64 {
	return uint64(len(eo.extension.Data))
}

199 200
func (prs *peerResponseSender) SendExtensionData(requestID graphsync.RequestID, extension graphsync.ExtensionData, notifees ...notifications.Notifee) {
	prs.execute([]responseOperation{extensionOperation{requestID, extension}}, notifees)
Hannah Howard's avatar
Hannah Howard committed
201 202 203 204 205
}

type peerResponseTransactionSender struct {
	requestID  graphsync.RequestID
	operations []responseOperation
206
	notifees   []notifications.Notifee
Hannah Howard's avatar
Hannah Howard committed
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
	prs        *peerResponseSender
}

func (prts *peerResponseTransactionSender) SendResponse(link ipld.Link, data []byte) graphsync.BlockData {
	op := prts.prs.setupBlockOperation(prts.requestID, link, data)
	prts.operations = append(prts.operations, op)
	return op
}

func (prts *peerResponseTransactionSender) SendExtensionData(extension graphsync.ExtensionData) {
	prts.operations = append(prts.operations, extensionOperation{prts.requestID, extension})
}

func (prts *peerResponseTransactionSender) FinishRequest() graphsync.ResponseStatusCode {
	op := prts.prs.setupFinishOperation(prts.requestID)
	prts.operations = append(prts.operations, op)
	return op.status
}

func (prts *peerResponseTransactionSender) FinishWithError(status graphsync.ResponseStatusCode) {
	prts.operations = append(prts.operations, prts.prs.setupFinishWithErrOperation(prts.requestID, status))
}

func (prts *peerResponseTransactionSender) PauseRequest() {
	prts.operations = append(prts.operations, statusOperation{prts.requestID, graphsync.RequestPaused})
}

234 235 236 237
func (prts *peerResponseTransactionSender) FinishWithCancel() {
	_ = prts.prs.finishTracking(prts.requestID)
}

238 239 240 241
func (prts *peerResponseTransactionSender) AddNotifee(notifee notifications.Notifee) {
	prts.notifees = append(prts.notifees, notifee)
}

Hannah Howard's avatar
Hannah Howard committed
242 243 244 245 246 247 248
func (prs *peerResponseSender) Transaction(requestID graphsync.RequestID, transaction Transaction) error {
	prts := &peerResponseTransactionSender{
		requestID: requestID,
		prs:       prs,
	}
	err := transaction(prts)
	if err == nil {
249
		prs.execute(prts.operations, prts.notifees)
Hannah Howard's avatar
Hannah Howard committed
250 251 252 253 254 255
	}
	return err
}

type blockOperation struct {
	data      []byte
256
	sendBlock bool
Hannah Howard's avatar
Hannah Howard committed
257 258
	link      ipld.Link
	requestID graphsync.RequestID
259 260
}

Hannah Howard's avatar
Hannah Howard committed
261 262 263 264 265 266 267 268 269 270
func (bo blockOperation) build(responseBuilder *responsebuilder.ResponseBuilder) {
	if bo.sendBlock {
		cidLink := bo.link.(cidlink.Link)
		block, err := blocks.NewBlockWithCid(bo.data, cidLink.Cid)
		if err != nil {
			log.Errorf("Data did not match cid when sending link for %s", cidLink.String())
		}
		responseBuilder.AddBlock(block)
	}
	responseBuilder.AddLink(bo.requestID, bo.link, bo.data != nil)
271 272
}

Hannah Howard's avatar
Hannah Howard committed
273 274
func (bo blockOperation) Link() ipld.Link {
	return bo.link
275 276
}

Hannah Howard's avatar
Hannah Howard committed
277 278 279 280 281 282
func (bo blockOperation) BlockSize() uint64 {
	return uint64(len(bo.data))
}

func (bo blockOperation) BlockSizeOnWire() uint64 {
	if !bo.sendBlock {
283 284
		return 0
	}
Hannah Howard's avatar
Hannah Howard committed
285 286 287 288 289 290 291 292 293 294 295
	return bo.BlockSize()
}

func (bo blockOperation) size() uint64 {
	return bo.BlockSizeOnWire()
}

func (prs *peerResponseSender) setupBlockOperation(requestID graphsync.RequestID,
	link ipld.Link, data []byte) blockOperation {
	hasBlock := data != nil
	prs.linkTrackerLk.Lock()
Hannah Howard's avatar
Hannah Howard committed
296 297 298
	linkTracker := prs.getLinkTracker(requestID)
	sendBlock := hasBlock && linkTracker.BlockRefCount(link) == 0
	linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
Hannah Howard's avatar
Hannah Howard committed
299 300 301 302
	prs.linkTrackerLk.Unlock()
	return blockOperation{
		data, sendBlock, link, requestID,
	}
303 304
}

305 306 307
// SendResponse sends a given link for a given
// requestID across the wire, as well as its corresponding
// block if the block is present and has not already been sent
308
// it returns the number of block bytes sent
Hannah Howard's avatar
Hannah Howard committed
309
func (prs *peerResponseSender) SendResponse(
310
	requestID graphsync.RequestID,
311 312
	link ipld.Link,
	data []byte,
313
	notifees ...notifications.Notifee,
314
) graphsync.BlockData {
Hannah Howard's avatar
Hannah Howard committed
315
	op := prs.setupBlockOperation(requestID, link, data)
316
	prs.execute([]responseOperation{op}, notifees)
Hannah Howard's avatar
Hannah Howard committed
317
	return op
318 319
}

Hannah Howard's avatar
Hannah Howard committed
320 321 322 323 324 325 326 327 328 329 330 331 332
type statusOperation struct {
	requestID graphsync.RequestID
	status    graphsync.ResponseStatusCode
}

func (fo statusOperation) build(responseBuilder *responsebuilder.ResponseBuilder) {
	responseBuilder.AddResponseCode(fo.requestID, fo.status)
}

func (fo statusOperation) size() uint64 {
	return 0
}

333
func (prs *peerResponseSender) finishTracking(requestID graphsync.RequestID) bool {
Hannah Howard's avatar
Hannah Howard committed
334
	prs.linkTrackerLk.Lock()
335
	defer prs.linkTrackerLk.Unlock()
Hannah Howard's avatar
Hannah Howard committed
336 337 338 339 340 341 342 343 344 345
	linkTracker := prs.getLinkTracker(requestID)
	allBlocks := linkTracker.FinishRequest(requestID)
	key, ok := prs.dedupKeys[requestID]
	if ok {
		delete(prs.dedupKeys, requestID)
		if linkTracker.Empty() {
			delete(prs.altTrackers, key)
		}
	}
	return allBlocks
346 347 348 349
}

func (prs *peerResponseSender) setupFinishOperation(requestID graphsync.RequestID) statusOperation {
	isComplete := prs.finishTracking(requestID)
350
	var status graphsync.ResponseStatusCode
351
	if isComplete {
352
		status = graphsync.RequestCompletedFull
353
	} else {
354
		status = graphsync.RequestCompletedPartial
355
	}
Hannah Howard's avatar
Hannah Howard committed
356
	return statusOperation{requestID, status}
357 358
}

Hannah Howard's avatar
Hannah Howard committed
359
// FinishRequest marks the given requestID as having sent all responses
360
func (prs *peerResponseSender) FinishRequest(requestID graphsync.RequestID, notifees ...notifications.Notifee) graphsync.ResponseStatusCode {
Hannah Howard's avatar
Hannah Howard committed
361
	op := prs.setupFinishOperation(requestID)
362
	prs.execute([]responseOperation{op}, notifees)
Hannah Howard's avatar
Hannah Howard committed
363 364
	return op.status
}
365

Hannah Howard's avatar
Hannah Howard committed
366
func (prs *peerResponseSender) setupFinishWithErrOperation(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) statusOperation {
Hannah Howard's avatar
Hannah Howard committed
367
	prs.finishTracking(requestID)
Hannah Howard's avatar
Hannah Howard committed
368
	return statusOperation{requestID, status}
369
}
370

Hannah Howard's avatar
Hannah Howard committed
371
// FinishWithError marks the given requestID as having terminated with an error
372
func (prs *peerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode, notifees ...notifications.Notifee) {
Hannah Howard's avatar
Hannah Howard committed
373
	op := prs.setupFinishWithErrOperation(requestID, status)
374
	prs.execute([]responseOperation{op}, notifees)
Hannah Howard's avatar
Hannah Howard committed
375 376
}

377 378
func (prs *peerResponseSender) PauseRequest(requestID graphsync.RequestID, notifees ...notifications.Notifee) {
	prs.execute([]responseOperation{statusOperation{requestID, graphsync.RequestPaused}}, notifees)
379
}
Hannah Howard's avatar
Hannah Howard committed
380

381 382 383 384
func (prs *peerResponseSender) FinishWithCancel(requestID graphsync.RequestID) {
	_ = prs.finishTracking(requestID)
}

385
func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder), notifees []notifications.Notifee) bool {
Hannah Howard's avatar
Hannah Howard committed
386 387 388
	prs.responseBuildersLk.Lock()
	defer prs.responseBuildersLk.Unlock()
	if shouldBeginNewResponse(prs.responseBuilders, blkSize) {
389 390 391
		topic := prs.nextBuilderTopic
		prs.nextBuilderTopic++
		prs.responseBuilders = append(prs.responseBuilders, responsebuilder.New(topic))
392
	}
Hannah Howard's avatar
Hannah Howard committed
393
	responseBuilder := prs.responseBuilders[len(prs.responseBuilders)-1]
394
	buildResponseFn(responseBuilder)
395 396 397
	for _, notifee := range notifees {
		notifications.SubscribeOn(prs.publisher, responseBuilder.Topic(), notifee)
	}
398 399 400
	return !responseBuilder.Empty()
}

401
func shouldBeginNewResponse(responseBuilders []*responsebuilder.ResponseBuilder, blkSize uint64) bool {
402 403
	if len(responseBuilders) == 0 {
		return true
404
	}
405 406 407 408
	if blkSize == 0 {
		return false
	}
	return responseBuilders[len(responseBuilders)-1].BlockSize()+blkSize > maxBlockSize
409 410
}

Hannah Howard's avatar
Hannah Howard committed
411
func (prs *peerResponseSender) signalWork() {
412
	select {
Hannah Howard's avatar
Hannah Howard committed
413
	case prs.outgoingWork <- struct{}{}:
414 415 416 417
	default:
	}
}

Hannah Howard's avatar
Hannah Howard committed
418
func (prs *peerResponseSender) run() {
419 420
	for {
		select {
Hannah Howard's avatar
Hannah Howard committed
421
		case <-prs.ctx.Done():
422
			return
Hannah Howard's avatar
Hannah Howard committed
423 424
		case <-prs.outgoingWork:
			prs.sendResponseMessages()
425 426 427 428
		}
	}
}

Hannah Howard's avatar
Hannah Howard committed
429 430 431 432 433
func (prs *peerResponseSender) sendResponseMessages() {
	prs.responseBuildersLk.Lock()
	builders := prs.responseBuilders
	prs.responseBuilders = nil
	prs.responseBuildersLk.Unlock()
434

435 436 437 438
	for _, builder := range builders {
		if builder.Empty() {
			continue
		}
439
		responses, blks, err := builder.Build()
440 441 442
		if err != nil {
			log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
		}
443

444 445 446 447
		prs.peerHandler.SendResponse(prs.p, responses, blks, notifications.Notifee{
			Topic:      builder.Topic(),
			Subscriber: prs.subscriber,
		})
448

449
		// wait for message to be processed
450 451 452 453 454 455
		prs.waitForMessageQueud(builder.Topic())
	}
}

func (prs *peerResponseSender) waitForMessageQueud(topic responsebuilder.Topic) {
	for {
456
		select {
Hannah Howard's avatar
Hannah Howard committed
457
		case <-prs.ctx.Done():
458 459 460 461 462
			return
		case queuedTopic := <-prs.queuedMessages:
			if topic == queuedTopic {
				return
			}
463
		}
464
	}
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
}

type subscriber struct {
	prs *peerResponseSender
}

func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event) {
	builderTopic, ok := topic.(responsebuilder.Topic)
	if !ok {
		return
	}
	msgEvent, ok := event.(messagequeue.Event)
	if !ok {
		return
	}
	switch msgEvent.Name {
	case messagequeue.Sent:
		s.prs.publisher.Publish(builderTopic, Event{Name: Sent})
	case messagequeue.Error:
		s.prs.publisher.Publish(builderTopic, Event{Name: Error, Err: fmt.Errorf("error sending message: %w", msgEvent.Err)})
	case messagequeue.Queued:
		select {
		case s.prs.queuedMessages <- builderTopic:
		case <-s.prs.ctx.Done():
		}
	}
}
492

493 494
func (s *subscriber) OnClose(topic notifications.Topic) {
	s.prs.publisher.Close(topic)
495
}