peerresponsesender.go 10.8 KB
Newer Older
1 2 3 4 5 6
package peerresponsemanager

import (
	"context"
	"sync"

7
	"github.com/ipfs/go-graphsync"
8 9
	"github.com/ipfs/go-graphsync/peermanager"

10
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
11 12 13 14

	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"

15
	blocks "github.com/ipfs/go-block-format"
16
	"github.com/ipfs/go-graphsync/linktracker"
17 18
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
19
	"github.com/libp2p/go-libp2p-core/peer"
20 21
)

22 23
const (
	// max block size is the maximum size for batching blocks in a single payload
24
	maxBlockSize uint64 = 512 * 1024
25 26
)

27 28 29 30 31 32 33 34
var log = logging.Logger("graphsync")

// PeerMessageHandler is an interface that can send a response for a given peer across
// the network.
type PeerMessageHandler interface {
	SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block) <-chan struct{}
}

Hannah Howard's avatar
Hannah Howard committed
35 36 37
// Transaction is a series of operations that should be send together in a single response
type Transaction func(PeerResponseTransactionSender) error

38 39 40 41 42 43 44
type peerResponseSender struct {
	p            peer.ID
	ctx          context.Context
	cancel       context.CancelFunc
	peerHandler  PeerMessageHandler
	outgoingWork chan struct{}

45 46 47 48
	linkTrackerLk      sync.RWMutex
	linkTracker        *linktracker.LinkTracker
	responseBuildersLk sync.RWMutex
	responseBuilders   []*responsebuilder.ResponseBuilder
49 50 51 52 53 54
}

// PeerResponseSender handles batching, deduping, and sending responses for
// a given peer across multiple requests.
type PeerResponseSender interface {
	peermanager.PeerProcess
55
	IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link)
56
	SendResponse(
57
		requestID graphsync.RequestID,
58 59
		link ipld.Link,
		data []byte,
60
	) graphsync.BlockData
61
	SendExtensionData(graphsync.RequestID, graphsync.ExtensionData)
62
	FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode
63
	FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode)
Hannah Howard's avatar
Hannah Howard committed
64 65 66
	// Transaction calls multiple operations at once so they end up in a single response
	// Note: if the transaction function errors, the results will not execute
	Transaction(requestID graphsync.RequestID, transaction Transaction) error
Hannah Howard's avatar
Hannah Howard committed
67
	PauseRequest(requestID graphsync.RequestID)
68 69
}

Hannah Howard's avatar
Hannah Howard committed
70 71 72 73 74 75 76 77 78 79 80 81
// PeerResponseTransactionSender is a limited interface for sending responses inside a transaction
type PeerResponseTransactionSender interface {
	SendResponse(
		link ipld.Link,
		data []byte,
	) graphsync.BlockData
	SendExtensionData(graphsync.ExtensionData)
	FinishRequest() graphsync.ResponseStatusCode
	FinishWithError(status graphsync.ResponseStatusCode)
	PauseRequest()
}

82
// NewResponseSender generates a new PeerResponseSender for the given context, peer ID,
83
// using the given peer message handler.
84
func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler) PeerResponseSender {
85 86 87 88 89 90 91 92 93 94 95 96
	ctx, cancel := context.WithCancel(ctx)
	return &peerResponseSender{
		p:            p,
		ctx:          ctx,
		cancel:       cancel,
		peerHandler:  peerHandler,
		outgoingWork: make(chan struct{}, 1),
		linkTracker:  linktracker.New(),
	}
}

// Startup initiates message sending for a peer
Hannah Howard's avatar
Hannah Howard committed
97 98
func (prs *peerResponseSender) Startup() {
	go prs.run()
99 100
}

101 102 103 104 105 106 107 108
func (prs *peerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
	prs.linkTrackerLk.Lock()
	for _, link := range links {
		prs.linkTracker.RecordLinkTraversal(requestID, link, true)
	}
	prs.linkTrackerLk.Unlock()
}

Hannah Howard's avatar
Hannah Howard committed
109 110 111
type responseOperation interface {
	build(responseBuilder *responsebuilder.ResponseBuilder)
	size() uint64
112 113
}

Hannah Howard's avatar
Hannah Howard committed
114 115 116 117 118 119 120 121 122
func (prs *peerResponseSender) execute(operations []responseOperation) {
	size := uint64(0)
	for _, op := range operations {
		size += op.size()
	}
	if prs.buildResponse(size, func(responseBuilder *responsebuilder.ResponseBuilder) {
		for _, op := range operations {
			op.build(responseBuilder)
		}
123
	}) {
Hannah Howard's avatar
Hannah Howard committed
124
		prs.signalWork()
125 126 127
	}
}

Hannah Howard's avatar
Hannah Howard committed
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
// Shutdown stops sending messages for a peer
func (prs *peerResponseSender) Shutdown() {
	prs.cancel()
}

type extensionOperation struct {
	requestID graphsync.RequestID
	extension graphsync.ExtensionData
}

func (eo extensionOperation) build(responseBuilder *responsebuilder.ResponseBuilder) {
	responseBuilder.AddExtensionData(eo.requestID, eo.extension)
}

func (eo extensionOperation) size() uint64 {
	return uint64(len(eo.extension.Data))
}

func (prs *peerResponseSender) SendExtensionData(requestID graphsync.RequestID, extension graphsync.ExtensionData) {
	prs.execute([]responseOperation{extensionOperation{requestID, extension}})
}

type peerResponseTransactionSender struct {
	requestID  graphsync.RequestID
	operations []responseOperation
	prs        *peerResponseSender
}

func (prts *peerResponseTransactionSender) SendResponse(link ipld.Link, data []byte) graphsync.BlockData {
	op := prts.prs.setupBlockOperation(prts.requestID, link, data)
	prts.operations = append(prts.operations, op)
	return op
}

func (prts *peerResponseTransactionSender) SendExtensionData(extension graphsync.ExtensionData) {
	prts.operations = append(prts.operations, extensionOperation{prts.requestID, extension})
}

func (prts *peerResponseTransactionSender) FinishRequest() graphsync.ResponseStatusCode {
	op := prts.prs.setupFinishOperation(prts.requestID)
	prts.operations = append(prts.operations, op)
	return op.status
}

func (prts *peerResponseTransactionSender) FinishWithError(status graphsync.ResponseStatusCode) {
	prts.operations = append(prts.operations, prts.prs.setupFinishWithErrOperation(prts.requestID, status))
}

func (prts *peerResponseTransactionSender) PauseRequest() {
	prts.operations = append(prts.operations, statusOperation{prts.requestID, graphsync.RequestPaused})
}

func (prs *peerResponseSender) Transaction(requestID graphsync.RequestID, transaction Transaction) error {
	prts := &peerResponseTransactionSender{
		requestID: requestID,
		prs:       prs,
	}
	err := transaction(prts)
	if err == nil {
		prs.execute(prts.operations)
	}
	return err
}

type blockOperation struct {
	data      []byte
194
	sendBlock bool
Hannah Howard's avatar
Hannah Howard committed
195 196
	link      ipld.Link
	requestID graphsync.RequestID
197 198
}

Hannah Howard's avatar
Hannah Howard committed
199 200 201 202 203 204 205 206 207 208
func (bo blockOperation) build(responseBuilder *responsebuilder.ResponseBuilder) {
	if bo.sendBlock {
		cidLink := bo.link.(cidlink.Link)
		block, err := blocks.NewBlockWithCid(bo.data, cidLink.Cid)
		if err != nil {
			log.Errorf("Data did not match cid when sending link for %s", cidLink.String())
		}
		responseBuilder.AddBlock(block)
	}
	responseBuilder.AddLink(bo.requestID, bo.link, bo.data != nil)
209 210
}

Hannah Howard's avatar
Hannah Howard committed
211 212
func (bo blockOperation) Link() ipld.Link {
	return bo.link
213 214
}

Hannah Howard's avatar
Hannah Howard committed
215 216 217 218 219 220
func (bo blockOperation) BlockSize() uint64 {
	return uint64(len(bo.data))
}

func (bo blockOperation) BlockSizeOnWire() uint64 {
	if !bo.sendBlock {
221 222
		return 0
	}
Hannah Howard's avatar
Hannah Howard committed
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
	return bo.BlockSize()
}

func (bo blockOperation) size() uint64 {
	return bo.BlockSizeOnWire()
}

func (prs *peerResponseSender) setupBlockOperation(requestID graphsync.RequestID,
	link ipld.Link, data []byte) blockOperation {
	hasBlock := data != nil
	prs.linkTrackerLk.Lock()
	sendBlock := hasBlock && prs.linkTracker.BlockRefCount(link) == 0
	prs.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
	prs.linkTrackerLk.Unlock()
	return blockOperation{
		data, sendBlock, link, requestID,
	}
240 241
}

242 243 244
// SendResponse sends a given link for a given
// requestID across the wire, as well as its corresponding
// block if the block is present and has not already been sent
245
// it returns the number of block bytes sent
Hannah Howard's avatar
Hannah Howard committed
246
func (prs *peerResponseSender) SendResponse(
247
	requestID graphsync.RequestID,
248 249
	link ipld.Link,
	data []byte,
250
) graphsync.BlockData {
Hannah Howard's avatar
Hannah Howard committed
251 252 253
	op := prs.setupBlockOperation(requestID, link, data)
	prs.execute([]responseOperation{op})
	return op
254 255
}

Hannah Howard's avatar
Hannah Howard committed
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
type statusOperation struct {
	requestID graphsync.RequestID
	status    graphsync.ResponseStatusCode
}

func (fo statusOperation) build(responseBuilder *responsebuilder.ResponseBuilder) {
	responseBuilder.AddResponseCode(fo.requestID, fo.status)
}

func (fo statusOperation) size() uint64 {
	return 0
}

func (prs *peerResponseSender) setupFinishOperation(requestID graphsync.RequestID) statusOperation {
	prs.linkTrackerLk.Lock()
	isComplete := prs.linkTracker.FinishRequest(requestID)
	prs.linkTrackerLk.Unlock()
273
	var status graphsync.ResponseStatusCode
274
	if isComplete {
275
		status = graphsync.RequestCompletedFull
276
	} else {
277
		status = graphsync.RequestCompletedPartial
278
	}
Hannah Howard's avatar
Hannah Howard committed
279
	return statusOperation{requestID, status}
280 281
}

Hannah Howard's avatar
Hannah Howard committed
282 283 284 285 286 287
// FinishRequest marks the given requestID as having sent all responses
func (prs *peerResponseSender) FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode {
	op := prs.setupFinishOperation(requestID)
	prs.execute([]responseOperation{op})
	return op.status
}
288

Hannah Howard's avatar
Hannah Howard committed
289 290 291 292 293
func (prs *peerResponseSender) setupFinishWithErrOperation(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) statusOperation {
	prs.linkTrackerLk.Lock()
	prs.linkTracker.FinishRequest(requestID)
	prs.linkTrackerLk.Unlock()
	return statusOperation{requestID, status}
294
}
295

Hannah Howard's avatar
Hannah Howard committed
296 297 298 299
// FinishWithError marks the given requestID as having terminated with an error
func (prs *peerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
	op := prs.setupFinishWithErrOperation(requestID, status)
	prs.execute([]responseOperation{op})
Hannah Howard's avatar
Hannah Howard committed
300 301
}

Hannah Howard's avatar
Hannah Howard committed
302 303
func (prs *peerResponseSender) PauseRequest(requestID graphsync.RequestID) {
	prs.execute([]responseOperation{statusOperation{requestID, graphsync.RequestPaused}})
304
}
Hannah Howard's avatar
Hannah Howard committed
305 306 307 308 309 310

func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder)) bool {
	prs.responseBuildersLk.Lock()
	defer prs.responseBuildersLk.Unlock()
	if shouldBeginNewResponse(prs.responseBuilders, blkSize) {
		prs.responseBuilders = append(prs.responseBuilders, responsebuilder.New())
311
	}
Hannah Howard's avatar
Hannah Howard committed
312
	responseBuilder := prs.responseBuilders[len(prs.responseBuilders)-1]
313 314 315 316
	buildResponseFn(responseBuilder)
	return !responseBuilder.Empty()
}

317
func shouldBeginNewResponse(responseBuilders []*responsebuilder.ResponseBuilder, blkSize uint64) bool {
318 319
	if len(responseBuilders) == 0 {
		return true
320
	}
321 322 323 324
	if blkSize == 0 {
		return false
	}
	return responseBuilders[len(responseBuilders)-1].BlockSize()+blkSize > maxBlockSize
325 326
}

Hannah Howard's avatar
Hannah Howard committed
327
func (prs *peerResponseSender) signalWork() {
328
	select {
Hannah Howard's avatar
Hannah Howard committed
329
	case prs.outgoingWork <- struct{}{}:
330 331 332 333
	default:
	}
}

Hannah Howard's avatar
Hannah Howard committed
334
func (prs *peerResponseSender) run() {
335 336
	for {
		select {
Hannah Howard's avatar
Hannah Howard committed
337
		case <-prs.ctx.Done():
338
			return
Hannah Howard's avatar
Hannah Howard committed
339 340
		case <-prs.outgoingWork:
			prs.sendResponseMessages()
341 342 343 344
		}
	}
}

Hannah Howard's avatar
Hannah Howard committed
345 346 347 348 349
func (prs *peerResponseSender) sendResponseMessages() {
	prs.responseBuildersLk.Lock()
	builders := prs.responseBuilders
	prs.responseBuilders = nil
	prs.responseBuildersLk.Unlock()
350

351 352 353 354
	for _, builder := range builders {
		if builder.Empty() {
			continue
		}
355
		responses, blks, err := builder.Build()
356 357 358
		if err != nil {
			log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
		}
359

Hannah Howard's avatar
Hannah Howard committed
360
		done := prs.peerHandler.SendResponse(prs.p, responses, blks)
361

362 363 364
		// wait for message to be processed
		select {
		case <-done:
Hannah Howard's avatar
Hannah Howard committed
365
		case <-prs.ctx.Done():
366
		}
367
	}
368

369
}