engine.go 21.6 KB
Newer Older
1
// Package decision implements the decision engine for the bitswap service.
2
package decision
3 4

import (
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

10
	"github.com/google/uuid"
dirkmc's avatar
dirkmc committed
11

Jeromy's avatar
Jeromy committed
12
	bsmsg "github.com/ipfs/go-bitswap/message"
dirkmc's avatar
dirkmc committed
13
	pb "github.com/ipfs/go-bitswap/message/pb"
Jeromy's avatar
Jeromy committed
14
	wl "github.com/ipfs/go-bitswap/wantlist"
dirkmc's avatar
dirkmc committed
15
	blocks "github.com/ipfs/go-block-format"
16
	cid "github.com/ipfs/go-cid"
Jeromy's avatar
Jeromy committed
17 18
	bstore "github.com/ipfs/go-ipfs-blockstore"
	logging "github.com/ipfs/go-log"
19 20
	"github.com/ipfs/go-peertaskqueue"
	"github.com/ipfs/go-peertaskqueue/peertask"
21
	process "github.com/jbenet/goprocess"
Raúl Kripalani's avatar
Raúl Kripalani committed
22
	peer "github.com/libp2p/go-libp2p-core/peer"
23 24
)

25 26 27 28 29 30 31 32
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
33 34
// 1. an initial `sendwantlist` message to a provider of the first key in a
//    request
35 36 37 38 39 40 41 42 43 44 45 46
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
47 48
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as
//   appropriate
49
// * when handling `cancel`, if we recently received a wanted block from a
50
//   peer, include a partial wantlist that contains a few other high priority
51 52 53 54 55 56
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

Jeromy's avatar
Jeromy committed
57
var log = logging.Logger("engine")
58

Brian Tiger Chow's avatar
Brian Tiger Chow committed
59
const (
60 61
	// outboxChanBuffer must be 0 to prevent stale messages from being sent
	outboxChanBuffer = 0
dirkmc's avatar
dirkmc committed
62 63 64 65
	// targetMessageSize is the ideal size of the batched payload. We try to
	// pop this much data off the request queue, but it may be a little more
	// or less depending on what's in the queue.
	targetMessageSize = 16 * 1024
66 67
	// tagFormat is the tag given to peers associated an engine
	tagFormat = "bs-engine-%s-%s"
68

69 70 71 72
	// queuedTagWeight is the default weight for peers that have work queued
	// on their behalf.
	queuedTagWeight = 10

dirkmc's avatar
dirkmc committed
73 74 75 76 77 78 79
	// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
	// bytes up to which we will replace a want-have with a want-block
	maxBlockSizeReplaceHasWithBlock = 1024

	// Number of concurrent workers that pull tasks off the request queue
	taskWorkerCount = 8

80 81
	// Number of concurrent workers that process requests to the blockstore
	blockstoreWorkerCount = 128
82 83
)

84
// Envelope contains a message for a Peer.
85
type Envelope struct {
86
	// Peer is the intended recipient.
87
	Peer peer.ID
88

89
	// Message is the payload.
90
	Message bsmsg.BitSwapMessage
Jeromy's avatar
Jeromy committed
91 92 93

	// A callback to notify the decision queue that the task is complete
	Sent func()
94 95
}

96 97 98 99 100 101 102
// PeerTagger covers the methods on the connection manager used by the decision
// engine to tag peers
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
// Assigns a specific score to a peer
type ScorePeerFunc func(peer.ID, int)

// ScoreLedger is an external ledger dealing with peer scores.
type ScoreLedger interface {
	// Returns aggregated data communication with a given peer.
	GetReceipt(p peer.ID) *Receipt
	// Increments the sent counter for the given peer.
	AddToSentBytes(p peer.ID, n int)
	// Increments the received counter for the given peer.
	AddToReceivedBytes(p peer.ID, n int)
	// PeerConnected should be called when a new peer connects,
	// meaning the ledger should open accounting.
	PeerConnected(p peer.ID)
	// PeerDisconnected should be called when a peer disconnects to
	// clean up the accounting.
	PeerDisconnected(p peer.ID)
	// Starts the ledger sampling process.
	Start(scorePeer ScorePeerFunc)
	// Stops the sampling process.
	Stop()
}

126
// Engine manages sending requested blocks to peers.
127
type Engine struct {
128 129 130
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
131
	peerRequestQueue *peertaskqueue.PeerTaskQueue
132

133 134 135 136 137
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
138
	workSignal chan struct{}
139

140 141
	// outbox contains outgoing messages to peers. This is owned by the
	// taskWorker goroutine
Brian Tiger Chow's avatar
Brian Tiger Chow committed
142
	outbox chan (<-chan *Envelope)
143

144
	bsm *blockstoreManager
145

146 147
	peerTagger PeerTagger

148 149
	tagQueued, tagUseful string

dirkmc's avatar
dirkmc committed
150 151
	lock sync.RWMutex // protects the fields immediatly below

152
	// ledgerMap lists block-related Ledgers by their Partner key.
153
	ledgerMap map[peer.ID]*ledger
Jeromy's avatar
Jeromy committed
154

155 156 157
	// an external ledger dealing with peer scores
	scoreLedger ScoreLedger

Jeromy's avatar
Jeromy committed
158
	ticker *time.Ticker
159 160 161

	taskWorkerLock  sync.Mutex
	taskWorkerCount int
dirkmc's avatar
dirkmc committed
162 163 164 165 166

	// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
	// bytes up to which we will replace a want-have with a want-block
	maxBlockSizeReplaceHasWithBlock int

167 168
	sendDontHaves bool

dirkmc's avatar
dirkmc committed
169
	self peer.ID
170 171
}

172
// NewEngine creates a new block sending engine for the given block store
dirkmc's avatar
dirkmc committed
173
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
174
	return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, nil)
dirkmc's avatar
dirkmc committed
175 176 177
}

// This constructor is used by the tests
Dirk McCormick's avatar
Dirk McCormick committed
178
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
179
	maxReplaceSize int, scoreLedger ScoreLedger) *Engine {
Dirk McCormick's avatar
Dirk McCormick committed
180

181 182 183 184
	if scoreLedger == nil {
		scoreLedger = NewDefaultScoreLedger()
	}

185
	e := &Engine{
dirkmc's avatar
dirkmc committed
186
		ledgerMap:                       make(map[peer.ID]*ledger),
187
		scoreLedger:                     scoreLedger,
dirkmc's avatar
dirkmc committed
188 189 190 191 192 193 194
		bsm:                             newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
		peerTagger:                      peerTagger,
		outbox:                          make(chan (<-chan *Envelope), outboxChanBuffer),
		workSignal:                      make(chan struct{}, 1),
		ticker:                          time.NewTicker(time.Millisecond * 100),
		maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
		taskWorkerCount:                 taskWorkerCount,
195
		sendDontHaves:                   true,
dirkmc's avatar
dirkmc committed
196
		self:                            self,
197
	}
198 199
	e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
	e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
200 201
	e.peerRequestQueue = peertaskqueue.New(
		peertaskqueue.OnPeerAddedHook(e.onPeerAdded),
dirkmc's avatar
dirkmc committed
202 203 204
		peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved),
		peertaskqueue.TaskMerger(newTaskMerger()),
		peertaskqueue.IgnoreFreezing(true))
205
	return e
Jeromy's avatar
Jeromy committed
206 207
}

208 209 210 211 212 213 214 215 216 217
// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
// - Simply don't respond
// Older versions of Bitswap did not respond, so this allows us to simulate
// those older versions for testing.
func (e *Engine) SetSendDontHaves(send bool) {
	e.sendDontHaves = send
}

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
// Sets the scoreLedger to the given implementation. Should be called
// before StartWorkers().
func (e *Engine) UseScoreLedger(scoreLedger ScoreLedger) {
	e.scoreLedger = scoreLedger
}

// Starts the score ledger. Before start the function checks and,
// if it is unset, initializes the scoreLedger with the default
// implementation.
func (e *Engine) startScoreLedger(px process.Process) {
	e.scoreLedger.Start(func(p peer.ID, score int) {
		if score == 0 {
			e.peerTagger.UntagPeer(p, e.tagUseful)
		} else {
			e.peerTagger.TagPeer(p, e.tagUseful, score)
		}
	})
	px.Go(func(ppx process.Process) {
		<-ppx.Closing()
		e.scoreLedger.Stop()
	})
}

241 242 243 244
// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
	// Start up blockstore manager
	e.bsm.start(px)
245
	e.startScoreLedger(px)
246 247 248 249 250 251 252 253

	for i := 0; i < e.taskWorkerCount; i++ {
		px.Go(func(px process.Process) {
			e.taskWorker(ctx)
		})
	}
}

254
func (e *Engine) onPeerAdded(p peer.ID) {
255
	e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
256 257 258
}

func (e *Engine) onPeerRemoved(p peer.ID) {
259
	e.peerTagger.UntagPeer(p, e.tagQueued)
260 261
}

262 263
// WantlistForPeer returns the list of keys that the given peer has asked for
func (e *Engine) WantlistForPeer(p peer.ID) []wl.Entry {
264
	partner := e.findOrCreate(p)
265

266
	partner.lk.Lock()
267 268 269 270
	entries := partner.wantList.Entries()
	partner.lk.Unlock()

	wl.SortEntries(entries)
271 272

	return entries
273 274
}

275
// LedgerForPeer returns aggregated data communication with a given peer.
276
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
277
	return e.scoreLedger.GetReceipt(p)
278 279
}

dirkmc's avatar
dirkmc committed
280 281 282
// Each taskWorker pulls items off the request queue up to the maximum size
// and adds them to an envelope that is passed off to the bitswap workers,
// which send the message to the network.
283
func (e *Engine) taskWorker(ctx context.Context) {
284
	defer e.taskWorkerExit()
285
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
286
		oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
287 288 289 290 291 292 293 294 295 296 297 298
		select {
		case <-ctx.Done():
			return
		case e.outbox <- oneTimeUse:
		}
		// receiver is ready for an outoing envelope. let's prepare one. first,
		// we must acquire a task from the PQ...
		envelope, err := e.nextEnvelope(ctx)
		if err != nil {
			close(oneTimeUse)
			return // ctx cancelled
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
299
		oneTimeUse <- envelope // buffered. won't block
300 301 302 303
		close(oneTimeUse)
	}
}

304 305 306 307 308 309 310 311 312 313 314
// taskWorkerExit handles cleanup of task workers
func (e *Engine) taskWorkerExit() {
	e.taskWorkerLock.Lock()
	defer e.taskWorkerLock.Unlock()

	e.taskWorkerCount--
	if e.taskWorkerCount == 0 {
		close(e.outbox)
	}
}

315 316 317
// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
Jeromy's avatar
Jeromy committed
318
	for {
dirkmc's avatar
dirkmc committed
319 320 321
		// Pop some tasks off the request queue
		p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(targetMessageSize)
		for len(nextTasks) == 0 {
Jeromy's avatar
Jeromy committed
322
			select {
323
			case <-ctx.Done():
324
				return nil, ctx.Err()
325
			case <-e.workSignal:
dirkmc's avatar
dirkmc committed
326
				p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
Jeromy's avatar
Jeromy committed
327
			case <-e.ticker.C:
dirkmc's avatar
dirkmc committed
328 329 330
				// When a task is cancelled, the queue may be "frozen" for a
				// period of time. We periodically "thaw" the queue to make
				// sure it doesn't get stuck in a frozen state.
331
				e.peerRequestQueue.ThawRound()
dirkmc's avatar
dirkmc committed
332
				p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
Jeromy's avatar
Jeromy committed
333 334
			}
		}
335

dirkmc's avatar
dirkmc committed
336
		// Create a new message
337
		msg := bsmsg.New(false)
dirkmc's avatar
dirkmc committed
338

Dirk McCormick's avatar
Dirk McCormick committed
339
		log.Debugw("Bitswap process tasks", "local", e.self, "taskCount", len(nextTasks))
dirkmc's avatar
dirkmc committed
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361

		// Amount of data in the request queue still waiting to be popped
		msg.SetPendingBytes(int32(pendingBytes))

		// Split out want-blocks, want-haves and DONT_HAVEs
		blockCids := make([]cid.Cid, 0, len(nextTasks))
		blockTasks := make(map[cid.Cid]*taskData, len(nextTasks))
		for _, t := range nextTasks {
			c := t.Topic.(cid.Cid)
			td := t.Data.(*taskData)
			if td.HaveBlock {
				if td.IsWantBlock {
					blockCids = append(blockCids, c)
					blockTasks[c] = td
				} else {
					// Add HAVES to the message
					msg.AddHave(c)
				}
			} else {
				// Add DONT_HAVEs to the message
				msg.AddDontHave(c)
			}
362
		}
dirkmc's avatar
dirkmc committed
363 364 365

		// Fetch blocks from datastore
		blks, err := e.bsm.getBlocks(ctx, blockCids)
366 367 368 369
		if err != nil {
			// we're dropping the envelope but that's not an issue in practice.
			return nil, err
		}
370

dirkmc's avatar
dirkmc committed
371 372 373 374 375 376 377 378 379 380
		for c, t := range blockTasks {
			blk := blks[c]
			// If the block was not found (it has been removed)
			if blk == nil {
				// If the client requested DONT_HAVE, add DONT_HAVE to the message
				if t.SendDontHave {
					msg.AddDontHave(c)
				}
			} else {
				// Add the block to the message
Dirk McCormick's avatar
Dirk McCormick committed
381
				// log.Debugf("  make evlp %s->%s block: %s (%d bytes)", e.self, p, c, len(blk.RawData()))
dirkmc's avatar
dirkmc committed
382 383
				msg.AddBlock(blk)
			}
384
		}
385

dirkmc's avatar
dirkmc committed
386
		// If there's nothing in the message, bail out
387
		if msg.Empty() {
dirkmc's avatar
dirkmc committed
388
			e.peerRequestQueue.TasksDone(p, nextTasks...)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
389
			continue
390
		}
391

Dirk McCormick's avatar
Dirk McCormick committed
392
		log.Debugw("Bitswap engine -> msg", "local", e.self, "to", p, "blockCount", len(msg.Blocks()), "presenceCount", len(msg.BlockPresences()), "size", msg.Size())
393
		return &Envelope{
dirkmc's avatar
dirkmc committed
394
			Peer:    p,
395
			Message: msg,
396
			Sent: func() {
dirkmc's avatar
dirkmc committed
397 398 399 400 401 402
				// Once the message has been sent, signal the request queue so
				// it can be cleared from the queue
				e.peerRequestQueue.TasksDone(p, nextTasks...)

				// Signal the worker to check for more work
				e.signalNewWork()
403
			},
404
		}, nil
Jeromy's avatar
Jeromy committed
405 406 407
	}
}

408
// Outbox returns a channel of one-time use Envelope channels.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
409
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
410
	return e.outbox
411 412
}

413
// Peers returns a slice of Peers with whom the local node has active sessions.
414
func (e *Engine) Peers() []peer.ID {
dirkmc's avatar
dirkmc committed
415 416
	e.lock.RLock()
	defer e.lock.RUnlock()
417

418 419
	response := make([]peer.ID, 0, len(e.ledgerMap))

420
	for _, ledger := range e.ledgerMap {
421 422 423 424 425
		response = append(response, ledger.Partner)
	}
	return response
}

dirkmc's avatar
dirkmc committed
426 427 428
// MessageReceived is called when a message is received from a remote peer.
// For each item in the wantlist, add a want-have or want-block entry to the
// request queue (this is later popped off by the workerTasks)
429
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) {
dirkmc's avatar
dirkmc committed
430 431
	entries := m.Wantlist()

Dirk McCormick's avatar
Dirk McCormick committed
432 433 434 435 436 437 438 439 440 441 442 443
	if len(entries) > 0 {
		log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries))
		for _, et := range entries {
			if !et.Cancel {
				if et.WantType == pb.Message_Wantlist_Have {
					log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid)
				} else {
					log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid)
				}
			}
		}
	}
dirkmc's avatar
dirkmc committed
444

445
	if m.Empty() {
Dirk McCormick's avatar
Dirk McCormick committed
446
		log.Infof("received empty message from %s", p)
447 448
	}

449 450 451
	newWorkExists := false
	defer func() {
		if newWorkExists {
452
			e.signalNewWork()
453 454
		}
	}()
455

456
	// Get block sizes
dirkmc's avatar
dirkmc committed
457
	wants, cancels := e.splitWantsCancels(entries)
458
	wantKs := cid.NewSet()
dirkmc's avatar
dirkmc committed
459 460
	for _, entry := range wants {
		wantKs.Add(entry.Cid)
461
	}
462 463 464 465 466
	blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
	if err != nil {
		log.Info("aborting message processing", err)
		return
	}
467

dirkmc's avatar
dirkmc committed
468
	// Get the ledger for the peer
469
	l := e.findOrCreate(p)
Jeromy's avatar
Jeromy committed
470 471
	l.lk.Lock()
	defer l.lk.Unlock()
dirkmc's avatar
dirkmc committed
472 473

	// If the peer sent a full wantlist, replace the ledger's wantlist
474 475 476
	if m.Full() {
		l.wantList = wl.New()
	}
477

478
	var activeEntries []peertask.Task
dirkmc's avatar
dirkmc committed
479 480 481

	// Remove cancelled blocks from the queue
	for _, entry := range cancels {
Dirk McCormick's avatar
Dirk McCormick committed
482
		log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid)
dirkmc's avatar
dirkmc committed
483
		if l.CancelWant(entry.Cid) {
484
			e.peerRequestQueue.Remove(entry.Cid, p)
dirkmc's avatar
dirkmc committed
485 486 487 488 489 490 491 492 493 494 495 496 497
		}
	}

	// For each want-have / want-block
	for _, entry := range wants {
		c := entry.Cid
		blockSize, found := blockSizes[entry.Cid]

		// Add each want-have / want-block to the ledger
		l.Wants(c, entry.Priority, entry.WantType)

		// If the block was not found
		if !found {
Dirk McCormick's avatar
Dirk McCormick committed
498 499
			log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave)

dirkmc's avatar
dirkmc committed
500
			// Only add the task to the queue if the requester wants a DONT_HAVE
501
			if e.sendDontHaves && entry.SendDontHave {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
502
				newWorkExists = true
dirkmc's avatar
dirkmc committed
503 504 505
				isWantBlock := false
				if entry.WantType == pb.Message_Wantlist_Block {
					isWantBlock = true
506
				}
dirkmc's avatar
dirkmc committed
507 508 509

				activeEntries = append(activeEntries, peertask.Task{
					Topic:    c,
510
					Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
					Work:     bsmsg.BlockPresenceSize(c),
					Data: &taskData{
						BlockSize:    0,
						HaveBlock:    false,
						IsWantBlock:  isWantBlock,
						SendDontHave: entry.SendDontHave,
					},
				})
			}
		} else {
			// The block was found, add it to the queue
			newWorkExists = true

			isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

Dirk McCormick's avatar
Dirk McCormick committed
526
			log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", entry.Cid, "isWantBlock", isWantBlock)
dirkmc's avatar
dirkmc committed
527 528 529 530 531 532 533 534

			// entrySize is the amount of space the entry takes up in the
			// message we send to the recipient. If we're sending a block, the
			// entrySize is the size of the block. Otherwise it's the size of
			// a block presence entry.
			entrySize := blockSize
			if !isWantBlock {
				entrySize = bsmsg.BlockPresenceSize(c)
535
			}
dirkmc's avatar
dirkmc committed
536 537
			activeEntries = append(activeEntries, peertask.Task{
				Topic:    c,
538
				Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
539 540 541 542 543 544 545 546
				Work:     entrySize,
				Data: &taskData{
					BlockSize:    blockSize,
					HaveBlock:    true,
					IsWantBlock:  isWantBlock,
					SendDontHave: entry.SendDontHave,
				},
			})
547 548
		}
	}
dirkmc's avatar
dirkmc committed
549 550

	// Push entries onto the request queue
551
	if len(activeEntries) > 0 {
dirkmc's avatar
dirkmc committed
552
		e.peerRequestQueue.PushTasks(p, activeEntries...)
553
	}
dirkmc's avatar
dirkmc committed
554 555 556 557 558 559 560 561 562 563 564 565
}

// Split the want-have / want-block entries from the cancel entries
func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
	wants := make([]bsmsg.Entry, 0, len(es))
	cancels := make([]bsmsg.Entry, 0, len(es))
	for _, et := range es {
		if et.Cancel {
			cancels = append(cancels, et)
		} else {
			wants = append(wants, et)
		}
566
	}
dirkmc's avatar
dirkmc committed
567
	return wants, cancels
568 569
}

dirkmc's avatar
dirkmc committed
570 571 572
// ReceiveFrom is called when new blocks are received and added to the block
// store, meaning there may be peers who want those blocks, so we should send
// the blocks to them.
573 574
//
// This function also updates the receive side of the ledger.
dirkmc's avatar
dirkmc committed
575 576 577 578 579
func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) {
	if len(blks) == 0 {
		return
	}

580 581 582 583 584 585 586
	if from != "" {
		l := e.findOrCreate(from)
		l.lk.Lock()

		// Record how many bytes were received in the ledger
		for _, blk := range blks {
			log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData()))
587
			e.scoreLedger.AddToReceivedBytes(l.Partner, len(blk.RawData()))
588 589 590 591 592
		}

		l.lk.Unlock()
	}

dirkmc's avatar
dirkmc committed
593 594 595 596 597
	// Get the size of each block
	blockSizes := make(map[cid.Cid]int, len(blks))
	for _, blk := range blks {
		blockSizes[blk.Cid()] = len(blk.RawData())
	}
598

dirkmc's avatar
dirkmc committed
599 600 601
	// Check each peer to see if it wants one of the blocks we received
	work := false
	e.lock.RLock()
602

603
	for _, l := range e.ledgerMap {
dirkmc's avatar
dirkmc committed
604 605 606 607 608
		l.lk.RLock()

		for _, b := range blks {
			k := b.Cid()

609
			if entry, ok := l.WantListContains(k); ok {
610
				work = true
dirkmc's avatar
dirkmc committed
611 612 613 614 615 616 617 618 619 620 621

				blockSize := blockSizes[k]
				isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

				entrySize := blockSize
				if !isWantBlock {
					entrySize = bsmsg.BlockPresenceSize(k)
				}

				e.peerRequestQueue.PushTasks(l.Partner, peertask.Task{
					Topic:    entry.Cid,
622
					Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
623 624 625 626 627 628 629 630
					Work:     entrySize,
					Data: &taskData{
						BlockSize:    blockSize,
						HaveBlock:    true,
						IsWantBlock:  isWantBlock,
						SendDontHave: false,
					},
				})
631
			}
632
		}
dirkmc's avatar
dirkmc committed
633
		l.lk.RUnlock()
634
	}
dirkmc's avatar
dirkmc committed
635
	e.lock.RUnlock()
636 637 638 639 640 641

	if work {
		e.signalNewWork()
	}
}

642 643 644 645 646 647
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

648 649
// MessageSent is called when a message has successfully been sent out, to record
// changes.
650
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
651
	l := e.findOrCreate(p)
652 653 654
	l.lk.Lock()
	defer l.lk.Unlock()

dirkmc's avatar
dirkmc committed
655
	// Remove sent blocks from the want list for the peer
656
	for _, block := range m.Blocks() {
657
		e.scoreLedger.AddToSentBytes(l.Partner, len(block.RawData()))
dirkmc's avatar
dirkmc committed
658 659 660 661 662
		l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block)
	}

	// Remove sent block presences from the want list for the peer
	for _, bp := range m.BlockPresences() {
663
		// Don't record sent data. We reserve that for data blocks.
dirkmc's avatar
dirkmc committed
664 665 666
		if bp.Type == pb.Message_Have {
			l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have)
		}
667 668 669
	}
}

670 671
// PeerConnected is called when a new peer connects, meaning we should start
// sending blocks.
672 673
func (e *Engine) PeerConnected(p peer.ID) {
	e.lock.Lock()
674
	defer e.lock.Unlock()
675 676

	_, ok := e.ledgerMap[p]
677
	if !ok {
678
		e.ledgerMap[p] = newLedger(p)
679
	}
680 681

	e.scoreLedger.PeerConnected(p)
682 683
}

684
// PeerDisconnected is called when a peer disconnects.
685
func (e *Engine) PeerDisconnected(p peer.ID) {
686 687
	e.lock.Lock()
	defer e.lock.Unlock()
dirkmc's avatar
dirkmc committed
688

689
	delete(e.ledgerMap, p)
690 691

	e.scoreLedger.PeerDisconnected(p)
692 693
}

dirkmc's avatar
dirkmc committed
694 695 696 697 698 699 700
// If the want is a want-have, and it's below a certain size, send the full
// block (instead of sending a HAVE)
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
	isWantBlock := wantType == pb.Message_Wantlist_Block
	return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
}

701
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
702
	return e.LedgerForPeer(p).Sent
703 704
}

705
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
706
	return e.LedgerForPeer(p).Recv
707 708 709
}

// ledger lazily instantiates a ledger
710
func (e *Engine) findOrCreate(p peer.ID) *ledger {
dirkmc's avatar
dirkmc committed
711 712 713 714 715 716 717 718 719 720 721
	// Take a read lock (as it's less expensive) to check if we have a ledger
	// for the peer
	e.lock.RLock()
	l, ok := e.ledgerMap[p]
	e.lock.RUnlock()
	if ok {
		return l
	}

	// There's no ledger, so take a write lock, then check again and create the
	// ledger if necessary
Jeromy's avatar
Jeromy committed
722
	e.lock.Lock()
723
	defer e.lock.Unlock()
dirkmc's avatar
dirkmc committed
724
	l, ok = e.ledgerMap[p]
725 726
	if !ok {
		l = newLedger(p)
727
		e.ledgerMap[p] = l
728 729 730
	}
	return l
}
731 732 733 734 735 736 737 738

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}