peerresponsesender.go 6.81 KB
Newer Older
1 2 3 4 5 6
package peerresponsemanager

import (
	"context"
	"sync"

7
	"github.com/ipfs/go-graphsync"
8 9
	"github.com/ipfs/go-graphsync/peermanager"

10
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
11 12 13 14

	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"

15
	blocks "github.com/ipfs/go-block-format"
16
	"github.com/ipfs/go-graphsync/linktracker"
17 18
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
19
	"github.com/libp2p/go-libp2p-core/peer"
20 21
)

22 23
const (
	// max block size is the maximum size for batching blocks in a single payload
24
	maxBlockSize uint64 = 512 * 1024
25 26
)

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
var log = logging.Logger("graphsync")

// PeerMessageHandler is an interface that can send a response for a given peer across
// the network.
type PeerMessageHandler interface {
	SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block) <-chan struct{}
}

type peerResponseSender struct {
	p            peer.ID
	ctx          context.Context
	cancel       context.CancelFunc
	peerHandler  PeerMessageHandler
	outgoingWork chan struct{}

42 43 44 45
	linkTrackerLk      sync.RWMutex
	linkTracker        *linktracker.LinkTracker
	responseBuildersLk sync.RWMutex
	responseBuilders   []*responsebuilder.ResponseBuilder
46 47 48 49 50 51 52
}

// PeerResponseSender handles batching, deduping, and sending responses for
// a given peer across multiple requests.
type PeerResponseSender interface {
	peermanager.PeerProcess
	SendResponse(
53
		requestID graphsync.RequestID,
54 55
		link ipld.Link,
		data []byte,
56
	) graphsync.BlockData
57
	SendExtensionData(graphsync.RequestID, graphsync.ExtensionData)
58
	FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode
59
	FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode)
Hannah Howard's avatar
Hannah Howard committed
60
	PauseRequest(requestID graphsync.RequestID)
61 62 63
}

// NewResponseSender generates a new PeerResponseSender for the given context, peer ID,
64
// using the given peer message handler.
65
func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler) PeerResponseSender {
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
	ctx, cancel := context.WithCancel(ctx)
	return &peerResponseSender{
		p:            p,
		ctx:          ctx,
		cancel:       cancel,
		peerHandler:  peerHandler,
		outgoingWork: make(chan struct{}, 1),
		linkTracker:  linktracker.New(),
	}
}

// Startup initiates message sending for a peer
func (prm *peerResponseSender) Startup() {
	go prm.run()
}

// Shutdown stops sending messages for a peer
func (prm *peerResponseSender) Shutdown() {
	prm.cancel()
}

87 88 89 90 91 92 93 94
func (prm *peerResponseSender) SendExtensionData(requestID graphsync.RequestID, extension graphsync.ExtensionData) {
	if prm.buildResponse(0, func(responseBuilder *responsebuilder.ResponseBuilder) {
		responseBuilder.AddExtensionData(requestID, extension)
	}) {
		prm.signalWork()
	}
}

95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
type blockData struct {
	link      ipld.Link
	blockSize uint64
	sendBlock bool
}

func (bd blockData) Link() ipld.Link {
	return bd.link
}

func (bd blockData) BlockSize() uint64 {
	return bd.blockSize
}

func (bd blockData) BlockSizeOnWire() uint64 {
	if !bd.sendBlock {
		return 0
	}
	return bd.blockSize
}

116 117 118
// SendResponse sends a given link for a given
// requestID across the wire, as well as its corresponding
// block if the block is present and has not already been sent
119
// it returns the number of block bytes sent
120
func (prm *peerResponseSender) SendResponse(
121
	requestID graphsync.RequestID,
122 123
	link ipld.Link,
	data []byte,
124
) graphsync.BlockData {
125 126
	hasBlock := data != nil
	prm.linkTrackerLk.Lock()
127
	sendBlock := hasBlock && prm.linkTracker.BlockRefCount(link) == 0
128 129
	blkSize := uint64(len(data))
	bd := blockData{link, blkSize, sendBlock}
130 131 132
	prm.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
	prm.linkTrackerLk.Unlock()

133
	if prm.buildResponse(bd.BlockSizeOnWire(), func(responseBuilder *responsebuilder.ResponseBuilder) {
134 135 136 137 138 139 140 141 142 143 144 145
		if sendBlock {
			cidLink := link.(cidlink.Link)
			block, err := blocks.NewBlockWithCid(data, cidLink.Cid)
			if err != nil {
				log.Errorf("Data did not match cid when sending link for %s", cidLink.String())
			}
			responseBuilder.AddBlock(block)
		}
		responseBuilder.AddLink(requestID, link, hasBlock)
	}) {
		prm.signalWork()
	}
146
	return bd
147 148 149
}

// FinishRequest marks the given requestID as having sent all responses
150
func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode {
151 152 153
	prm.linkTrackerLk.Lock()
	isComplete := prm.linkTracker.FinishRequest(requestID)
	prm.linkTrackerLk.Unlock()
154
	var status graphsync.ResponseStatusCode
155
	if isComplete {
156
		status = graphsync.RequestCompletedFull
157
	} else {
158
		status = graphsync.RequestCompletedPartial
159 160
	}
	prm.finish(requestID, status)
161
	return status
162 163 164
}

// FinishWithError marks the given requestID as having terminated with an error
165
func (prm *peerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
166 167 168 169 170 171
	prm.linkTrackerLk.Lock()
	prm.linkTracker.FinishRequest(requestID)
	prm.linkTrackerLk.Unlock()

	prm.finish(requestID, status)
}
172

Hannah Howard's avatar
Hannah Howard committed
173 174 175 176
func (prm *peerResponseSender) PauseRequest(requestID graphsync.RequestID) {
	prm.finish(requestID, graphsync.RequestPaused)
}

177
func (prm *peerResponseSender) finish(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
178
	if prm.buildResponse(0, func(responseBuilder *responsebuilder.ResponseBuilder) {
Hannah Howard's avatar
Hannah Howard committed
179
		responseBuilder.AddResponseCode(requestID, status)
180 181 182 183
	}) {
		prm.signalWork()
	}
}
184
func (prm *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder)) bool {
185 186 187 188 189 190 191 192 193 194
	prm.responseBuildersLk.Lock()
	defer prm.responseBuildersLk.Unlock()
	if shouldBeginNewResponse(prm.responseBuilders, blkSize) {
		prm.responseBuilders = append(prm.responseBuilders, responsebuilder.New())
	}
	responseBuilder := prm.responseBuilders[len(prm.responseBuilders)-1]
	buildResponseFn(responseBuilder)
	return !responseBuilder.Empty()
}

195
func shouldBeginNewResponse(responseBuilders []*responsebuilder.ResponseBuilder, blkSize uint64) bool {
196 197
	if len(responseBuilders) == 0 {
		return true
198
	}
199 200 201 202
	if blkSize == 0 {
		return false
	}
	return responseBuilders[len(responseBuilders)-1].BlockSize()+blkSize > maxBlockSize
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
}

func (prm *peerResponseSender) signalWork() {
	select {
	case prm.outgoingWork <- struct{}{}:
	default:
	}
}

func (prm *peerResponseSender) run() {
	for {
		select {
		case <-prm.ctx.Done():
			return
		case <-prm.outgoingWork:
218
			prm.sendResponseMessages()
219 220 221 222
		}
	}
}

223 224 225 226 227
func (prm *peerResponseSender) sendResponseMessages() {
	prm.responseBuildersLk.Lock()
	builders := prm.responseBuilders
	prm.responseBuilders = nil
	prm.responseBuildersLk.Unlock()
228

229 230 231 232
	for _, builder := range builders {
		if builder.Empty() {
			continue
		}
233
		responses, blks, err := builder.Build()
234 235 236
		if err != nil {
			log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
		}
237

238
		done := prm.peerHandler.SendResponse(prm.p, responses, blks)
239

240 241 242 243 244
		// wait for message to be processed
		select {
		case <-done:
		case <-prm.ctx.Done():
		}
245
	}
246

247
}