peerresponsesender.go 6.15 KB
Newer Older
1 2 3 4 5 6
package peerresponsemanager

import (
	"context"
	"sync"

7
	"github.com/ipfs/go-graphsync"
8 9
	"github.com/ipfs/go-graphsync/peermanager"

10
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
11 12 13 14

	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"

15
	blocks "github.com/ipfs/go-block-format"
16
	"github.com/ipfs/go-graphsync/linktracker"
17 18
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
19
	"github.com/libp2p/go-libp2p-core/peer"
20 21
)

22 23 24 25 26
const (
	// max block size is the maximum size for batching blocks in a single payload
	maxBlockSize = 512 * 1024
)

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
var log = logging.Logger("graphsync")

// PeerMessageHandler is an interface that can send a response for a given peer across
// the network.
type PeerMessageHandler interface {
	SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block) <-chan struct{}
}

type peerResponseSender struct {
	p            peer.ID
	ctx          context.Context
	cancel       context.CancelFunc
	peerHandler  PeerMessageHandler
	outgoingWork chan struct{}

42 43 44 45
	linkTrackerLk      sync.RWMutex
	linkTracker        *linktracker.LinkTracker
	responseBuildersLk sync.RWMutex
	responseBuilders   []*responsebuilder.ResponseBuilder
46 47 48 49 50 51 52
}

// PeerResponseSender handles batching, deduping, and sending responses for
// a given peer across multiple requests.
type PeerResponseSender interface {
	peermanager.PeerProcess
	SendResponse(
53
		requestID graphsync.RequestID,
54 55 56
		link ipld.Link,
		data []byte,
	)
57
	SendExtensionData(graphsync.RequestID, graphsync.ExtensionData)
58 59
	FinishRequest(requestID graphsync.RequestID)
	FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode)
60 61 62 63
}

// NewResponseSender generates a new PeerResponseSender for the given context, peer ID,
// using the given peer message handler and bridge to IPLD.
64
func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler) PeerResponseSender {
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
	ctx, cancel := context.WithCancel(ctx)
	return &peerResponseSender{
		p:            p,
		ctx:          ctx,
		cancel:       cancel,
		peerHandler:  peerHandler,
		outgoingWork: make(chan struct{}, 1),
		linkTracker:  linktracker.New(),
	}
}

// Startup initiates message sending for a peer
func (prm *peerResponseSender) Startup() {
	go prm.run()
}

// Shutdown stops sending messages for a peer
func (prm *peerResponseSender) Shutdown() {
	prm.cancel()
}

86 87 88 89 90 91 92 93
func (prm *peerResponseSender) SendExtensionData(requestID graphsync.RequestID, extension graphsync.ExtensionData) {
	if prm.buildResponse(0, func(responseBuilder *responsebuilder.ResponseBuilder) {
		responseBuilder.AddExtensionData(requestID, extension)
	}) {
		prm.signalWork()
	}
}

94 95 96 97
// SendResponse sends a given link for a given
// requestID across the wire, as well as its corresponding
// block if the block is present and has not already been sent
func (prm *peerResponseSender) SendResponse(
98
	requestID graphsync.RequestID,
99 100 101 102 103
	link ipld.Link,
	data []byte,
) {
	hasBlock := data != nil
	prm.linkTrackerLk.Lock()
104
	sendBlock := hasBlock && prm.linkTracker.BlockRefCount(link) == 0
105 106 107 108
	blkSize := len(data)
	if !sendBlock {
		blkSize = 0
	}
109 110 111
	prm.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
	prm.linkTrackerLk.Unlock()

112
	if prm.buildResponse(blkSize, func(responseBuilder *responsebuilder.ResponseBuilder) {
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
		if sendBlock {
			cidLink := link.(cidlink.Link)
			block, err := blocks.NewBlockWithCid(data, cidLink.Cid)
			if err != nil {
				log.Errorf("Data did not match cid when sending link for %s", cidLink.String())
			}
			responseBuilder.AddBlock(block)
		}
		responseBuilder.AddLink(requestID, link, hasBlock)
	}) {
		prm.signalWork()
	}
}

// FinishRequest marks the given requestID as having sent all responses
128
func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) {
129 130 131
	prm.linkTrackerLk.Lock()
	isComplete := prm.linkTracker.FinishRequest(requestID)
	prm.linkTrackerLk.Unlock()
132
	var status graphsync.ResponseStatusCode
133
	if isComplete {
134
		status = graphsync.RequestCompletedFull
135
	} else {
136
		status = graphsync.RequestCompletedPartial
137 138 139 140 141
	}
	prm.finish(requestID, status)
}

// FinishWithError marks the given requestID as having terminated with an error
142
func (prm *peerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
143 144 145 146 147 148
	prm.linkTrackerLk.Lock()
	prm.linkTracker.FinishRequest(requestID)
	prm.linkTrackerLk.Unlock()

	prm.finish(requestID, status)
}
149

150
func (prm *peerResponseSender) finish(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
151
	if prm.buildResponse(0, func(responseBuilder *responsebuilder.ResponseBuilder) {
152
		responseBuilder.AddCompletedRequest(requestID, status)
153 154 155 156
	}) {
		prm.signalWork()
	}
}
157 158 159 160 161 162 163 164 165 166 167 168 169 170
func (prm *peerResponseSender) buildResponse(blkSize int, buildResponseFn func(*responsebuilder.ResponseBuilder)) bool {
	prm.responseBuildersLk.Lock()
	defer prm.responseBuildersLk.Unlock()
	if shouldBeginNewResponse(prm.responseBuilders, blkSize) {
		prm.responseBuilders = append(prm.responseBuilders, responsebuilder.New())
	}
	responseBuilder := prm.responseBuilders[len(prm.responseBuilders)-1]
	buildResponseFn(responseBuilder)
	return !responseBuilder.Empty()
}

func shouldBeginNewResponse(responseBuilders []*responsebuilder.ResponseBuilder, blkSize int) bool {
	if len(responseBuilders) == 0 {
		return true
171
	}
172 173 174 175
	if blkSize == 0 {
		return false
	}
	return responseBuilders[len(responseBuilders)-1].BlockSize()+blkSize > maxBlockSize
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
}

func (prm *peerResponseSender) signalWork() {
	select {
	case prm.outgoingWork <- struct{}{}:
	default:
	}
}

func (prm *peerResponseSender) run() {
	for {
		select {
		case <-prm.ctx.Done():
			return
		case <-prm.outgoingWork:
191
			prm.sendResponseMessages()
192 193 194 195
		}
	}
}

196 197 198 199 200
func (prm *peerResponseSender) sendResponseMessages() {
	prm.responseBuildersLk.Lock()
	builders := prm.responseBuilders
	prm.responseBuilders = nil
	prm.responseBuildersLk.Unlock()
201

202 203 204 205
	for _, builder := range builders {
		if builder.Empty() {
			continue
		}
206
		responses, blks, err := builder.Build()
207 208 209
		if err != nil {
			log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
		}
210

211
		done := prm.peerHandler.SendResponse(prm.p, responses, blks)
212

213 214 215 216 217
		// wait for message to be processed
		select {
		case <-done:
		case <-prm.ctx.Done():
		}
218
	}
219

220
}