engine.go 21.4 KB
Newer Older
1
// Package decision implements the decision engine for the bitswap service.
2
package decision
3 4

import (
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

10
	"github.com/google/uuid"
dirkmc's avatar
dirkmc committed
11

12
	process "github.com/jbenet/goprocess"
13 14 15 16 17 18 19 20 21 22
	bsmsg "gitlab.dms3.io/dms3/go-bitswap/message"
	pb "gitlab.dms3.io/dms3/go-bitswap/message/pb"
	wl "gitlab.dms3.io/dms3/go-bitswap/wantlist"
	blocks "gitlab.dms3.io/dms3/go-block-format"
	cid "gitlab.dms3.io/dms3/go-cid"
	bstore "gitlab.dms3.io/dms3/go-dms3-blockstore"
	logging "gitlab.dms3.io/dms3/go-log"
	"gitlab.dms3.io/dms3/go-peertaskqueue"
	"gitlab.dms3.io/dms3/go-peertaskqueue/peertask"
	peer "gitlab.dms3.io/p2p/go-p2p-core/peer"
23 24
)

25 26 27 28 29 30 31 32
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
33 34
// 1. an initial `sendwantlist` message to a provider of the first key in a
//    request
35 36 37 38 39 40 41 42 43 44 45 46
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
47 48
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as
//   appropriate
49
// * when handling `cancel`, if we recently received a wanted block from a
50
//   peer, include a partial wantlist that contains a few other high priority
51 52 53 54 55 56
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

Jeromy's avatar
Jeromy committed
57
var log = logging.Logger("engine")
58

Brian Tiger Chow's avatar
Brian Tiger Chow committed
59
const (
60 61
	// outboxChanBuffer must be 0 to prevent stale messages from being sent
	outboxChanBuffer = 0
dirkmc's avatar
dirkmc committed
62 63 64 65
	// targetMessageSize is the ideal size of the batched payload. We try to
	// pop this much data off the request queue, but it may be a little more
	// or less depending on what's in the queue.
	targetMessageSize = 16 * 1024
66 67
	// tagFormat is the tag given to peers associated an engine
	tagFormat = "bs-engine-%s-%s"
68

69 70 71 72
	// queuedTagWeight is the default weight for peers that have work queued
	// on their behalf.
	queuedTagWeight = 10

dirkmc's avatar
dirkmc committed
73 74 75 76 77 78
	// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
	// bytes up to which we will replace a want-have with a want-block
	maxBlockSizeReplaceHasWithBlock = 1024

	// Number of concurrent workers that pull tasks off the request queue
	taskWorkerCount = 8
79 80
)

81
// Envelope contains a message for a Peer.
82
type Envelope struct {
83
	// Peer is the intended recipient.
84
	Peer peer.ID
85

86
	// Message is the payload.
87
	Message bsmsg.BitSwapMessage
Jeromy's avatar
Jeromy committed
88 89 90

	// A callback to notify the decision queue that the task is complete
	Sent func()
91 92
}

93 94 95 96 97 98 99
// PeerTagger covers the methods on the connection manager used by the decision
// engine to tag peers
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
// Assigns a specific score to a peer
type ScorePeerFunc func(peer.ID, int)

// ScoreLedger is an external ledger dealing with peer scores.
type ScoreLedger interface {
	// Returns aggregated data communication with a given peer.
	GetReceipt(p peer.ID) *Receipt
	// Increments the sent counter for the given peer.
	AddToSentBytes(p peer.ID, n int)
	// Increments the received counter for the given peer.
	AddToReceivedBytes(p peer.ID, n int)
	// PeerConnected should be called when a new peer connects,
	// meaning the ledger should open accounting.
	PeerConnected(p peer.ID)
	// PeerDisconnected should be called when a peer disconnects to
	// clean up the accounting.
	PeerDisconnected(p peer.ID)
	// Starts the ledger sampling process.
	Start(scorePeer ScorePeerFunc)
	// Stops the sampling process.
	Stop()
}

123
// Engine manages sending requested blocks to peers.
124
type Engine struct {
125 126 127
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
128
	peerRequestQueue *peertaskqueue.PeerTaskQueue
129

130 131 132 133 134
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
135
	workSignal chan struct{}
136

137 138
	// outbox contains outgoing messages to peers. This is owned by the
	// taskWorker goroutine
Brian Tiger Chow's avatar
Brian Tiger Chow committed
139
	outbox chan (<-chan *Envelope)
140

141
	bsm *blockstoreManager
142

143 144
	peerTagger PeerTagger

145 146
	tagQueued, tagUseful string

dirkmc's avatar
dirkmc committed
147 148
	lock sync.RWMutex // protects the fields immediatly below

149
	// ledgerMap lists block-related Ledgers by their Partner key.
150
	ledgerMap map[peer.ID]*ledger
Jeromy's avatar
Jeromy committed
151

152 153 154
	// an external ledger dealing with peer scores
	scoreLedger ScoreLedger

Jeromy's avatar
Jeromy committed
155
	ticker *time.Ticker
156 157 158

	taskWorkerLock  sync.Mutex
	taskWorkerCount int
dirkmc's avatar
dirkmc committed
159 160 161 162 163

	// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
	// bytes up to which we will replace a want-have with a want-block
	maxBlockSizeReplaceHasWithBlock int

164 165
	sendDontHaves bool

166
	self peer.ID
167 168
}

169
// NewEngine creates a new block sending engine for the given block store
170 171
func NewEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID, scoreLedger ScoreLedger) *Engine {
	return newEngine(bs, bstoreWorkerCount, peerTagger, self, maxBlockSizeReplaceHasWithBlock, scoreLedger)
dirkmc's avatar
dirkmc committed
172 173 174
}

// This constructor is used by the tests
175
func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID,
176
	maxReplaceSize int, scoreLedger ScoreLedger) *Engine {
Dirk McCormick's avatar
Dirk McCormick committed
177

178 179 180 181
	if scoreLedger == nil {
		scoreLedger = NewDefaultScoreLedger()
	}

182
	e := &Engine{
dirkmc's avatar
dirkmc committed
183
		ledgerMap:                       make(map[peer.ID]*ledger),
184
		scoreLedger:                     scoreLedger,
185
		bsm:                             newBlockstoreManager(bs, bstoreWorkerCount),
dirkmc's avatar
dirkmc committed
186 187 188 189 190 191
		peerTagger:                      peerTagger,
		outbox:                          make(chan (<-chan *Envelope), outboxChanBuffer),
		workSignal:                      make(chan struct{}, 1),
		ticker:                          time.NewTicker(time.Millisecond * 100),
		maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
		taskWorkerCount:                 taskWorkerCount,
192
		sendDontHaves:                   true,
dirkmc's avatar
dirkmc committed
193
		self:                            self,
194
	}
195 196
	e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
	e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
197 198
	e.peerRequestQueue = peertaskqueue.New(
		peertaskqueue.OnPeerAddedHook(e.onPeerAdded),
dirkmc's avatar
dirkmc committed
199 200 201
		peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved),
		peertaskqueue.TaskMerger(newTaskMerger()),
		peertaskqueue.IgnoreFreezing(true))
202
	return e
Jeromy's avatar
Jeromy committed
203 204
}

205 206 207 208 209 210 211 212 213 214
// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
// - Simply don't respond
// Older versions of Bitswap did not respond, so this allows us to simulate
// those older versions for testing.
func (e *Engine) SetSendDontHaves(send bool) {
	e.sendDontHaves = send
}

215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
// Starts the score ledger. Before start the function checks and,
// if it is unset, initializes the scoreLedger with the default
// implementation.
func (e *Engine) startScoreLedger(px process.Process) {
	e.scoreLedger.Start(func(p peer.ID, score int) {
		if score == 0 {
			e.peerTagger.UntagPeer(p, e.tagUseful)
		} else {
			e.peerTagger.TagPeer(p, e.tagUseful, score)
		}
	})
	px.Go(func(ppx process.Process) {
		<-ppx.Closing()
		e.scoreLedger.Stop()
	})
}

232 233 234 235
// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
	// Start up blockstore manager
	e.bsm.start(px)
236
	e.startScoreLedger(px)
237 238 239 240 241 242 243 244

	for i := 0; i < e.taskWorkerCount; i++ {
		px.Go(func(px process.Process) {
			e.taskWorker(ctx)
		})
	}
}

245
func (e *Engine) onPeerAdded(p peer.ID) {
246
	e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
247 248 249
}

func (e *Engine) onPeerRemoved(p peer.ID) {
250
	e.peerTagger.UntagPeer(p, e.tagQueued)
251 252
}

253 254
// WantlistForPeer returns the list of keys that the given peer has asked for
func (e *Engine) WantlistForPeer(p peer.ID) []wl.Entry {
255
	partner := e.findOrCreate(p)
256

257
	partner.lk.Lock()
258 259 260 261
	entries := partner.wantList.Entries()
	partner.lk.Unlock()

	wl.SortEntries(entries)
262 263

	return entries
264 265
}

266
// LedgerForPeer returns aggregated data communication with a given peer.
267
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
268
	return e.scoreLedger.GetReceipt(p)
269 270
}

dirkmc's avatar
dirkmc committed
271 272 273
// Each taskWorker pulls items off the request queue up to the maximum size
// and adds them to an envelope that is passed off to the bitswap workers,
// which send the message to the network.
274
func (e *Engine) taskWorker(ctx context.Context) {
275
	defer e.taskWorkerExit()
276
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
277
		oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
278 279 280 281 282 283 284 285 286 287 288 289
		select {
		case <-ctx.Done():
			return
		case e.outbox <- oneTimeUse:
		}
		// receiver is ready for an outoing envelope. let's prepare one. first,
		// we must acquire a task from the PQ...
		envelope, err := e.nextEnvelope(ctx)
		if err != nil {
			close(oneTimeUse)
			return // ctx cancelled
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
290
		oneTimeUse <- envelope // buffered. won't block
291 292 293 294
		close(oneTimeUse)
	}
}

295 296 297 298 299 300 301 302 303 304 305
// taskWorkerExit handles cleanup of task workers
func (e *Engine) taskWorkerExit() {
	e.taskWorkerLock.Lock()
	defer e.taskWorkerLock.Unlock()

	e.taskWorkerCount--
	if e.taskWorkerCount == 0 {
		close(e.outbox)
	}
}

306 307 308
// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
Jeromy's avatar
Jeromy committed
309
	for {
dirkmc's avatar
dirkmc committed
310 311 312
		// Pop some tasks off the request queue
		p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(targetMessageSize)
		for len(nextTasks) == 0 {
Jeromy's avatar
Jeromy committed
313
			select {
314
			case <-ctx.Done():
315
				return nil, ctx.Err()
316
			case <-e.workSignal:
dirkmc's avatar
dirkmc committed
317
				p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
Jeromy's avatar
Jeromy committed
318
			case <-e.ticker.C:
dirkmc's avatar
dirkmc committed
319 320 321
				// When a task is cancelled, the queue may be "frozen" for a
				// period of time. We periodically "thaw" the queue to make
				// sure it doesn't get stuck in a frozen state.
322
				e.peerRequestQueue.ThawRound()
dirkmc's avatar
dirkmc committed
323
				p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
Jeromy's avatar
Jeromy committed
324 325
			}
		}
326

dirkmc's avatar
dirkmc committed
327
		// Create a new message
328
		msg := bsmsg.New(false)
dirkmc's avatar
dirkmc committed
329

Dirk McCormick's avatar
Dirk McCormick committed
330
		log.Debugw("Bitswap process tasks", "local", e.self, "taskCount", len(nextTasks))
dirkmc's avatar
dirkmc committed
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352

		// Amount of data in the request queue still waiting to be popped
		msg.SetPendingBytes(int32(pendingBytes))

		// Split out want-blocks, want-haves and DONT_HAVEs
		blockCids := make([]cid.Cid, 0, len(nextTasks))
		blockTasks := make(map[cid.Cid]*taskData, len(nextTasks))
		for _, t := range nextTasks {
			c := t.Topic.(cid.Cid)
			td := t.Data.(*taskData)
			if td.HaveBlock {
				if td.IsWantBlock {
					blockCids = append(blockCids, c)
					blockTasks[c] = td
				} else {
					// Add HAVES to the message
					msg.AddHave(c)
				}
			} else {
				// Add DONT_HAVEs to the message
				msg.AddDontHave(c)
			}
353
		}
dirkmc's avatar
dirkmc committed
354 355 356

		// Fetch blocks from datastore
		blks, err := e.bsm.getBlocks(ctx, blockCids)
357 358 359 360
		if err != nil {
			// we're dropping the envelope but that's not an issue in practice.
			return nil, err
		}
361

dirkmc's avatar
dirkmc committed
362 363 364 365 366 367 368 369 370 371
		for c, t := range blockTasks {
			blk := blks[c]
			// If the block was not found (it has been removed)
			if blk == nil {
				// If the client requested DONT_HAVE, add DONT_HAVE to the message
				if t.SendDontHave {
					msg.AddDontHave(c)
				}
			} else {
				// Add the block to the message
Dirk McCormick's avatar
Dirk McCormick committed
372
				// log.Debugf("  make evlp %s->%s block: %s (%d bytes)", e.self, p, c, len(blk.RawData()))
dirkmc's avatar
dirkmc committed
373 374
				msg.AddBlock(blk)
			}
375
		}
376

dirkmc's avatar
dirkmc committed
377
		// If there's nothing in the message, bail out
378
		if msg.Empty() {
dirkmc's avatar
dirkmc committed
379
			e.peerRequestQueue.TasksDone(p, nextTasks...)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
380
			continue
381
		}
382

Dirk McCormick's avatar
Dirk McCormick committed
383
		log.Debugw("Bitswap engine -> msg", "local", e.self, "to", p, "blockCount", len(msg.Blocks()), "presenceCount", len(msg.BlockPresences()), "size", msg.Size())
384
		return &Envelope{
dirkmc's avatar
dirkmc committed
385
			Peer:    p,
386
			Message: msg,
387
			Sent: func() {
dirkmc's avatar
dirkmc committed
388 389 390 391 392 393
				// Once the message has been sent, signal the request queue so
				// it can be cleared from the queue
				e.peerRequestQueue.TasksDone(p, nextTasks...)

				// Signal the worker to check for more work
				e.signalNewWork()
394
			},
395
		}, nil
Jeromy's avatar
Jeromy committed
396 397 398
	}
}

399
// Outbox returns a channel of one-time use Envelope channels.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
400
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
401
	return e.outbox
402 403
}

404
// Peers returns a slice of Peers with whom the local node has active sessions.
405
func (e *Engine) Peers() []peer.ID {
dirkmc's avatar
dirkmc committed
406 407
	e.lock.RLock()
	defer e.lock.RUnlock()
408

409 410
	response := make([]peer.ID, 0, len(e.ledgerMap))

411
	for _, ledger := range e.ledgerMap {
412 413 414 415 416
		response = append(response, ledger.Partner)
	}
	return response
}

dirkmc's avatar
dirkmc committed
417 418 419
// MessageReceived is called when a message is received from a remote peer.
// For each item in the wantlist, add a want-have or want-block entry to the
// request queue (this is later popped off by the workerTasks)
420
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) {
dirkmc's avatar
dirkmc committed
421 422
	entries := m.Wantlist()

Dirk McCormick's avatar
Dirk McCormick committed
423 424 425 426 427 428 429 430 431 432 433 434
	if len(entries) > 0 {
		log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries))
		for _, et := range entries {
			if !et.Cancel {
				if et.WantType == pb.Message_Wantlist_Have {
					log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid)
				} else {
					log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid)
				}
			}
		}
	}
dirkmc's avatar
dirkmc committed
435

436
	if m.Empty() {
Dirk McCormick's avatar
Dirk McCormick committed
437
		log.Infof("received empty message from %s", p)
438 439
	}

440 441 442
	newWorkExists := false
	defer func() {
		if newWorkExists {
443
			e.signalNewWork()
444 445
		}
	}()
446

447
	// Get block sizes
dirkmc's avatar
dirkmc committed
448
	wants, cancels := e.splitWantsCancels(entries)
449
	wantKs := cid.NewSet()
dirkmc's avatar
dirkmc committed
450 451
	for _, entry := range wants {
		wantKs.Add(entry.Cid)
452
	}
453 454 455 456 457
	blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
	if err != nil {
		log.Info("aborting message processing", err)
		return
	}
458

dirkmc's avatar
dirkmc committed
459
	// Get the ledger for the peer
460
	l := e.findOrCreate(p)
Jeromy's avatar
Jeromy committed
461 462
	l.lk.Lock()
	defer l.lk.Unlock()
dirkmc's avatar
dirkmc committed
463 464

	// If the peer sent a full wantlist, replace the ledger's wantlist
465 466 467
	if m.Full() {
		l.wantList = wl.New()
	}
468

469
	var activeEntries []peertask.Task
dirkmc's avatar
dirkmc committed
470 471 472

	// Remove cancelled blocks from the queue
	for _, entry := range cancels {
Dirk McCormick's avatar
Dirk McCormick committed
473
		log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid)
dirkmc's avatar
dirkmc committed
474
		if l.CancelWant(entry.Cid) {
475
			e.peerRequestQueue.Remove(entry.Cid, p)
dirkmc's avatar
dirkmc committed
476 477 478 479 480 481 482 483 484 485 486 487 488
		}
	}

	// For each want-have / want-block
	for _, entry := range wants {
		c := entry.Cid
		blockSize, found := blockSizes[entry.Cid]

		// Add each want-have / want-block to the ledger
		l.Wants(c, entry.Priority, entry.WantType)

		// If the block was not found
		if !found {
Dirk McCormick's avatar
Dirk McCormick committed
489 490
			log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave)

dirkmc's avatar
dirkmc committed
491
			// Only add the task to the queue if the requester wants a DONT_HAVE
492
			if e.sendDontHaves && entry.SendDontHave {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
493
				newWorkExists = true
dirkmc's avatar
dirkmc committed
494 495 496
				isWantBlock := false
				if entry.WantType == pb.Message_Wantlist_Block {
					isWantBlock = true
497
				}
dirkmc's avatar
dirkmc committed
498 499 500

				activeEntries = append(activeEntries, peertask.Task{
					Topic:    c,
501
					Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
					Work:     bsmsg.BlockPresenceSize(c),
					Data: &taskData{
						BlockSize:    0,
						HaveBlock:    false,
						IsWantBlock:  isWantBlock,
						SendDontHave: entry.SendDontHave,
					},
				})
			}
		} else {
			// The block was found, add it to the queue
			newWorkExists = true

			isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

Dirk McCormick's avatar
Dirk McCormick committed
517
			log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", entry.Cid, "isWantBlock", isWantBlock)
dirkmc's avatar
dirkmc committed
518 519 520 521 522 523 524 525

			// entrySize is the amount of space the entry takes up in the
			// message we send to the recipient. If we're sending a block, the
			// entrySize is the size of the block. Otherwise it's the size of
			// a block presence entry.
			entrySize := blockSize
			if !isWantBlock {
				entrySize = bsmsg.BlockPresenceSize(c)
526
			}
dirkmc's avatar
dirkmc committed
527 528
			activeEntries = append(activeEntries, peertask.Task{
				Topic:    c,
529
				Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
530 531 532 533 534 535 536 537
				Work:     entrySize,
				Data: &taskData{
					BlockSize:    blockSize,
					HaveBlock:    true,
					IsWantBlock:  isWantBlock,
					SendDontHave: entry.SendDontHave,
				},
			})
538 539
		}
	}
dirkmc's avatar
dirkmc committed
540 541

	// Push entries onto the request queue
542
	if len(activeEntries) > 0 {
dirkmc's avatar
dirkmc committed
543
		e.peerRequestQueue.PushTasks(p, activeEntries...)
544
	}
dirkmc's avatar
dirkmc committed
545 546 547 548 549 550 551 552 553 554 555 556
}

// Split the want-have / want-block entries from the cancel entries
func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
	wants := make([]bsmsg.Entry, 0, len(es))
	cancels := make([]bsmsg.Entry, 0, len(es))
	for _, et := range es {
		if et.Cancel {
			cancels = append(cancels, et)
		} else {
			wants = append(wants, et)
		}
557
	}
dirkmc's avatar
dirkmc committed
558
	return wants, cancels
559 560
}

dirkmc's avatar
dirkmc committed
561 562 563
// ReceiveFrom is called when new blocks are received and added to the block
// store, meaning there may be peers who want those blocks, so we should send
// the blocks to them.
564 565
//
// This function also updates the receive side of the ledger.
dirkmc's avatar
dirkmc committed
566 567 568 569 570
func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) {
	if len(blks) == 0 {
		return
	}

571 572 573 574 575 576 577
	if from != "" {
		l := e.findOrCreate(from)
		l.lk.Lock()

		// Record how many bytes were received in the ledger
		for _, blk := range blks {
			log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData()))
578
			e.scoreLedger.AddToReceivedBytes(l.Partner, len(blk.RawData()))
579 580 581 582 583
		}

		l.lk.Unlock()
	}

dirkmc's avatar
dirkmc committed
584 585 586 587 588
	// Get the size of each block
	blockSizes := make(map[cid.Cid]int, len(blks))
	for _, blk := range blks {
		blockSizes[blk.Cid()] = len(blk.RawData())
	}
589

dirkmc's avatar
dirkmc committed
590 591 592
	// Check each peer to see if it wants one of the blocks we received
	work := false
	e.lock.RLock()
593

594
	for _, l := range e.ledgerMap {
dirkmc's avatar
dirkmc committed
595 596 597 598 599
		l.lk.RLock()

		for _, b := range blks {
			k := b.Cid()

600
			if entry, ok := l.WantListContains(k); ok {
601
				work = true
dirkmc's avatar
dirkmc committed
602 603 604 605 606 607 608 609 610 611 612

				blockSize := blockSizes[k]
				isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

				entrySize := blockSize
				if !isWantBlock {
					entrySize = bsmsg.BlockPresenceSize(k)
				}

				e.peerRequestQueue.PushTasks(l.Partner, peertask.Task{
					Topic:    entry.Cid,
613
					Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
614 615 616 617 618 619 620 621
					Work:     entrySize,
					Data: &taskData{
						BlockSize:    blockSize,
						HaveBlock:    true,
						IsWantBlock:  isWantBlock,
						SendDontHave: false,
					},
				})
622
			}
623
		}
dirkmc's avatar
dirkmc committed
624
		l.lk.RUnlock()
625
	}
dirkmc's avatar
dirkmc committed
626
	e.lock.RUnlock()
627 628 629 630 631 632

	if work {
		e.signalNewWork()
	}
}

633 634 635 636 637 638
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

639 640
// MessageSent is called when a message has successfully been sent out, to record
// changes.
641
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
642
	l := e.findOrCreate(p)
643 644 645
	l.lk.Lock()
	defer l.lk.Unlock()

dirkmc's avatar
dirkmc committed
646
	// Remove sent blocks from the want list for the peer
647
	for _, block := range m.Blocks() {
648
		e.scoreLedger.AddToSentBytes(l.Partner, len(block.RawData()))
dirkmc's avatar
dirkmc committed
649 650 651 652 653
		l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block)
	}

	// Remove sent block presences from the want list for the peer
	for _, bp := range m.BlockPresences() {
654
		// Don't record sent data. We reserve that for data blocks.
dirkmc's avatar
dirkmc committed
655 656 657
		if bp.Type == pb.Message_Have {
			l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have)
		}
658 659 660
	}
}

661 662
// PeerConnected is called when a new peer connects, meaning we should start
// sending blocks.
663 664
func (e *Engine) PeerConnected(p peer.ID) {
	e.lock.Lock()
665
	defer e.lock.Unlock()
666 667

	_, ok := e.ledgerMap[p]
668
	if !ok {
669
		e.ledgerMap[p] = newLedger(p)
670
	}
671 672

	e.scoreLedger.PeerConnected(p)
673 674
}

675
// PeerDisconnected is called when a peer disconnects.
676
func (e *Engine) PeerDisconnected(p peer.ID) {
677 678
	e.lock.Lock()
	defer e.lock.Unlock()
dirkmc's avatar
dirkmc committed
679

680
	delete(e.ledgerMap, p)
681 682

	e.scoreLedger.PeerDisconnected(p)
683 684
}

dirkmc's avatar
dirkmc committed
685 686 687 688 689 690 691
// If the want is a want-have, and it's below a certain size, send the full
// block (instead of sending a HAVE)
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
	isWantBlock := wantType == pb.Message_Wantlist_Block
	return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
}

692
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
693
	return e.LedgerForPeer(p).Sent
694 695
}

696
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
697
	return e.LedgerForPeer(p).Recv
698 699 700
}

// ledger lazily instantiates a ledger
701
func (e *Engine) findOrCreate(p peer.ID) *ledger {
dirkmc's avatar
dirkmc committed
702 703 704 705 706 707 708 709 710 711 712
	// Take a read lock (as it's less expensive) to check if we have a ledger
	// for the peer
	e.lock.RLock()
	l, ok := e.ledgerMap[p]
	e.lock.RUnlock()
	if ok {
		return l
	}

	// There's no ledger, so take a write lock, then check again and create the
	// ledger if necessary
Jeromy's avatar
Jeromy committed
713
	e.lock.Lock()
714
	defer e.lock.Unlock()
dirkmc's avatar
dirkmc committed
715
	l, ok = e.ledgerMap[p]
716 717
	if !ok {
		l = newLedger(p)
718
		e.ledgerMap[p] = l
719 720 721
	}
	return l
}
722 723 724 725 726 727 728 729

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}