peerresponsesender.go 6.57 KB
Newer Older
1 2 3 4 5 6
package peerresponsemanager

import (
	"context"
	"sync"

7
	"github.com/ipfs/go-graphsync"
8 9
	"github.com/ipfs/go-graphsync/peermanager"

10
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
11 12 13 14

	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"

15
	blocks "github.com/ipfs/go-block-format"
16
	"github.com/ipfs/go-graphsync/linktracker"
17 18
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
19
	"github.com/libp2p/go-libp2p-core/peer"
20 21
)

22 23
const (
	// max block size is the maximum size for batching blocks in a single payload
24
	maxBlockSize uint64 = 512 * 1024
25 26
)

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
var log = logging.Logger("graphsync")

// PeerMessageHandler is an interface that can send a response for a given peer across
// the network.
type PeerMessageHandler interface {
	SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block) <-chan struct{}
}

type peerResponseSender struct {
	p            peer.ID
	ctx          context.Context
	cancel       context.CancelFunc
	peerHandler  PeerMessageHandler
	outgoingWork chan struct{}

42 43 44 45
	linkTrackerLk      sync.RWMutex
	linkTracker        *linktracker.LinkTracker
	responseBuildersLk sync.RWMutex
	responseBuilders   []*responsebuilder.ResponseBuilder
46 47 48 49 50 51 52
}

// PeerResponseSender handles batching, deduping, and sending responses for
// a given peer across multiple requests.
type PeerResponseSender interface {
	peermanager.PeerProcess
	SendResponse(
53
		requestID graphsync.RequestID,
54 55
		link ipld.Link,
		data []byte,
56
	) graphsync.BlockData
57
	SendExtensionData(graphsync.RequestID, graphsync.ExtensionData)
58 59
	FinishRequest(requestID graphsync.RequestID)
	FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode)
60 61 62
}

// NewResponseSender generates a new PeerResponseSender for the given context, peer ID,
63
// using the given peer message handler.
64
func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler) PeerResponseSender {
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
	ctx, cancel := context.WithCancel(ctx)
	return &peerResponseSender{
		p:            p,
		ctx:          ctx,
		cancel:       cancel,
		peerHandler:  peerHandler,
		outgoingWork: make(chan struct{}, 1),
		linkTracker:  linktracker.New(),
	}
}

// Startup initiates message sending for a peer
func (prm *peerResponseSender) Startup() {
	go prm.run()
}

// Shutdown stops sending messages for a peer
func (prm *peerResponseSender) Shutdown() {
	prm.cancel()
}

86 87 88 89 90 91 92 93
func (prm *peerResponseSender) SendExtensionData(requestID graphsync.RequestID, extension graphsync.ExtensionData) {
	if prm.buildResponse(0, func(responseBuilder *responsebuilder.ResponseBuilder) {
		responseBuilder.AddExtensionData(requestID, extension)
	}) {
		prm.signalWork()
	}
}

94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
type blockData struct {
	link      ipld.Link
	blockSize uint64
	sendBlock bool
}

func (bd blockData) Link() ipld.Link {
	return bd.link
}

func (bd blockData) BlockSize() uint64 {
	return bd.blockSize
}

func (bd blockData) BlockSizeOnWire() uint64 {
	if !bd.sendBlock {
		return 0
	}
	return bd.blockSize
}

115 116 117
// SendResponse sends a given link for a given
// requestID across the wire, as well as its corresponding
// block if the block is present and has not already been sent
118
// it returns the number of block bytes sent
119
func (prm *peerResponseSender) SendResponse(
120
	requestID graphsync.RequestID,
121 122
	link ipld.Link,
	data []byte,
123
) graphsync.BlockData {
124 125
	hasBlock := data != nil
	prm.linkTrackerLk.Lock()
126
	sendBlock := hasBlock && prm.linkTracker.BlockRefCount(link) == 0
127 128
	blkSize := uint64(len(data))
	bd := blockData{link, blkSize, sendBlock}
129 130 131
	prm.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
	prm.linkTrackerLk.Unlock()

132
	if prm.buildResponse(bd.BlockSizeOnWire(), func(responseBuilder *responsebuilder.ResponseBuilder) {
133 134 135 136 137 138 139 140 141 142 143 144
		if sendBlock {
			cidLink := link.(cidlink.Link)
			block, err := blocks.NewBlockWithCid(data, cidLink.Cid)
			if err != nil {
				log.Errorf("Data did not match cid when sending link for %s", cidLink.String())
			}
			responseBuilder.AddBlock(block)
		}
		responseBuilder.AddLink(requestID, link, hasBlock)
	}) {
		prm.signalWork()
	}
145
	return bd
146 147 148
}

// FinishRequest marks the given requestID as having sent all responses
149
func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) {
150 151 152
	prm.linkTrackerLk.Lock()
	isComplete := prm.linkTracker.FinishRequest(requestID)
	prm.linkTrackerLk.Unlock()
153
	var status graphsync.ResponseStatusCode
154
	if isComplete {
155
		status = graphsync.RequestCompletedFull
156
	} else {
157
		status = graphsync.RequestCompletedPartial
158 159 160 161 162
	}
	prm.finish(requestID, status)
}

// FinishWithError marks the given requestID as having terminated with an error
163
func (prm *peerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
164 165 166 167 168 169
	prm.linkTrackerLk.Lock()
	prm.linkTracker.FinishRequest(requestID)
	prm.linkTrackerLk.Unlock()

	prm.finish(requestID, status)
}
170

171
func (prm *peerResponseSender) finish(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
172
	if prm.buildResponse(0, func(responseBuilder *responsebuilder.ResponseBuilder) {
173
		responseBuilder.AddCompletedRequest(requestID, status)
174 175 176 177
	}) {
		prm.signalWork()
	}
}
178
func (prm *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder)) bool {
179 180 181 182 183 184 185 186 187 188
	prm.responseBuildersLk.Lock()
	defer prm.responseBuildersLk.Unlock()
	if shouldBeginNewResponse(prm.responseBuilders, blkSize) {
		prm.responseBuilders = append(prm.responseBuilders, responsebuilder.New())
	}
	responseBuilder := prm.responseBuilders[len(prm.responseBuilders)-1]
	buildResponseFn(responseBuilder)
	return !responseBuilder.Empty()
}

189
func shouldBeginNewResponse(responseBuilders []*responsebuilder.ResponseBuilder, blkSize uint64) bool {
190 191
	if len(responseBuilders) == 0 {
		return true
192
	}
193 194 195 196
	if blkSize == 0 {
		return false
	}
	return responseBuilders[len(responseBuilders)-1].BlockSize()+blkSize > maxBlockSize
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
}

func (prm *peerResponseSender) signalWork() {
	select {
	case prm.outgoingWork <- struct{}{}:
	default:
	}
}

func (prm *peerResponseSender) run() {
	for {
		select {
		case <-prm.ctx.Done():
			return
		case <-prm.outgoingWork:
212
			prm.sendResponseMessages()
213 214 215 216
		}
	}
}

217 218 219 220 221
func (prm *peerResponseSender) sendResponseMessages() {
	prm.responseBuildersLk.Lock()
	builders := prm.responseBuilders
	prm.responseBuilders = nil
	prm.responseBuildersLk.Unlock()
222

223 224 225 226
	for _, builder := range builders {
		if builder.Empty() {
			continue
		}
227
		responses, blks, err := builder.Build()
228 229 230
		if err != nil {
			log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
		}
231

232
		done := prm.peerHandler.SendResponse(prm.p, responses, blks)
233

234 235 236 237 238
		// wait for message to be processed
		select {
		case <-done:
		case <-prm.ctx.Done():
		}
239
	}
240

241
}