engine.go 24.3 KB
Newer Older
1
// Package decision implements the decision engine for the bitswap service.
2
package decision
3 4

import (
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

10
	"github.com/google/uuid"
dirkmc's avatar
dirkmc committed
11

Jeromy's avatar
Jeromy committed
12
	bsmsg "github.com/ipfs/go-bitswap/message"
dirkmc's avatar
dirkmc committed
13
	pb "github.com/ipfs/go-bitswap/message/pb"
Jeromy's avatar
Jeromy committed
14
	wl "github.com/ipfs/go-bitswap/wantlist"
dirkmc's avatar
dirkmc committed
15
	blocks "github.com/ipfs/go-block-format"
16
	cid "github.com/ipfs/go-cid"
Jeromy's avatar
Jeromy committed
17 18
	bstore "github.com/ipfs/go-ipfs-blockstore"
	logging "github.com/ipfs/go-log"
19 20
	"github.com/ipfs/go-peertaskqueue"
	"github.com/ipfs/go-peertaskqueue/peertask"
21
	process "github.com/jbenet/goprocess"
Raúl Kripalani's avatar
Raúl Kripalani committed
22
	peer "github.com/libp2p/go-libp2p-core/peer"
23 24
)

25 26 27 28 29 30 31 32
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
33 34
// 1. an initial `sendwantlist` message to a provider of the first key in a
//    request
35 36 37 38 39 40 41 42 43 44 45 46
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
47 48
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as
//   appropriate
49
// * when handling `cancel`, if we recently received a wanted block from a
50
//   peer, include a partial wantlist that contains a few other high priority
51 52 53 54 55 56
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

Jeromy's avatar
Jeromy committed
57
var log = logging.Logger("engine")
58

Brian Tiger Chow's avatar
Brian Tiger Chow committed
59
const (
60 61
	// outboxChanBuffer must be 0 to prevent stale messages from being sent
	outboxChanBuffer = 0
dirkmc's avatar
dirkmc committed
62 63 64 65
	// targetMessageSize is the ideal size of the batched payload. We try to
	// pop this much data off the request queue, but it may be a little more
	// or less depending on what's in the queue.
	targetMessageSize = 16 * 1024
66 67
	// tagFormat is the tag given to peers associated an engine
	tagFormat = "bs-engine-%s-%s"
68

69 70 71 72 73 74 75 76 77 78
	// queuedTagWeight is the default weight for peers that have work queued
	// on their behalf.
	queuedTagWeight = 10

	// the alpha for the EWMA used to track short term usefulness
	shortTermAlpha = 0.5

	// the alpha for the EWMA used to track long term usefulness
	longTermAlpha = 0.05

Dirk McCormick's avatar
Dirk McCormick committed
79 80 81 82
	// how frequently the engine should sample usefulness. Peers that
	// interact every shortTerm time period are considered "active".
	shortTerm = 10 * time.Second

83 84 85 86 87 88 89 90
	// long term ratio defines what "long term" means in terms of the
	// shortTerm duration. Peers that interact once every longTermRatio are
	// considered useful over the long term.
	longTermRatio = 10

	// long/short term scores for tagging peers
	longTermScore  = 10 // this is a high tag but it grows _very_ slowly.
	shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
91

dirkmc's avatar
dirkmc committed
92 93 94 95 96 97 98
	// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
	// bytes up to which we will replace a want-have with a want-block
	maxBlockSizeReplaceHasWithBlock = 1024

	// Number of concurrent workers that pull tasks off the request queue
	taskWorkerCount = 8

99 100
	// Number of concurrent workers that process requests to the blockstore
	blockstoreWorkerCount = 128
101 102
)

103
// Envelope contains a message for a Peer.
104
type Envelope struct {
105
	// Peer is the intended recipient.
106
	Peer peer.ID
107

108
	// Message is the payload.
109
	Message bsmsg.BitSwapMessage
Jeromy's avatar
Jeromy committed
110 111 112

	// A callback to notify the decision queue that the task is complete
	Sent func()
113 114
}

115 116 117 118 119 120 121
// PeerTagger covers the methods on the connection manager used by the decision
// engine to tag peers
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

122
// Engine manages sending requested blocks to peers.
123
type Engine struct {
124 125 126
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
127
	peerRequestQueue *peertaskqueue.PeerTaskQueue
128

129 130 131 132 133
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
134
	workSignal chan struct{}
135

136 137
	// outbox contains outgoing messages to peers. This is owned by the
	// taskWorker goroutine
Brian Tiger Chow's avatar
Brian Tiger Chow committed
138
	outbox chan (<-chan *Envelope)
139

140
	bsm *blockstoreManager
141

142 143
	peerTagger PeerTagger

144 145
	tagQueued, tagUseful string

dirkmc's avatar
dirkmc committed
146 147
	lock sync.RWMutex // protects the fields immediatly below

148
	// ledgerMap lists Ledgers by their Partner key.
149
	ledgerMap map[peer.ID]*ledger
Jeromy's avatar
Jeromy committed
150 151

	ticker *time.Ticker
152 153 154

	taskWorkerLock  sync.Mutex
	taskWorkerCount int
dirkmc's avatar
dirkmc committed
155 156 157 158 159

	// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
	// bytes up to which we will replace a want-have with a want-block
	maxBlockSizeReplaceHasWithBlock int

Dirk McCormick's avatar
Dirk McCormick committed
160 161
	// how frequently the engine should sample peer usefulness
	peerSampleInterval time.Duration
162 163
	// used by the tests to detect when a sample is taken
	sampleCh chan struct{}
Dirk McCormick's avatar
Dirk McCormick committed
164

165 166
	sendDontHaves bool

dirkmc's avatar
dirkmc committed
167
	self peer.ID
168 169
}

170
// NewEngine creates a new block sending engine for the given block store
dirkmc's avatar
dirkmc committed
171
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
172
	return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm, nil)
dirkmc's avatar
dirkmc committed
173 174 175
}

// This constructor is used by the tests
Dirk McCormick's avatar
Dirk McCormick committed
176
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
177
	maxReplaceSize int, peerSampleInterval time.Duration, sampleCh chan struct{}) *Engine {
Dirk McCormick's avatar
Dirk McCormick committed
178

179
	e := &Engine{
dirkmc's avatar
dirkmc committed
180 181 182 183 184 185 186
		ledgerMap:                       make(map[peer.ID]*ledger),
		bsm:                             newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
		peerTagger:                      peerTagger,
		outbox:                          make(chan (<-chan *Envelope), outboxChanBuffer),
		workSignal:                      make(chan struct{}, 1),
		ticker:                          time.NewTicker(time.Millisecond * 100),
		maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
Dirk McCormick's avatar
Dirk McCormick committed
187
		peerSampleInterval:              peerSampleInterval,
188
		sampleCh:                        sampleCh,
dirkmc's avatar
dirkmc committed
189
		taskWorkerCount:                 taskWorkerCount,
190
		sendDontHaves:                   true,
dirkmc's avatar
dirkmc committed
191
		self:                            self,
192
	}
193 194
	e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
	e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
195 196
	e.peerRequestQueue = peertaskqueue.New(
		peertaskqueue.OnPeerAddedHook(e.onPeerAdded),
dirkmc's avatar
dirkmc committed
197 198 199
		peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved),
		peertaskqueue.TaskMerger(newTaskMerger()),
		peertaskqueue.IgnoreFreezing(true))
200
	return e
Jeromy's avatar
Jeromy committed
201 202
}

203 204 205 206 207 208 209 210 211 212
// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
// - Simply don't respond
// Older versions of Bitswap did not respond, so this allows us to simulate
// those older versions for testing.
func (e *Engine) SetSendDontHaves(send bool) {
	e.sendDontHaves = send
}

213 214 215 216
// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
	// Start up blockstore manager
	e.bsm.start(px)
217
	px.Go(e.scoreWorker)
218 219 220 221 222 223 224 225

	for i := 0; i < e.taskWorkerCount; i++ {
		px.Go(func(px process.Process) {
			e.taskWorker(ctx)
		})
	}
}

226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
241 242
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
243
func (e *Engine) scoreWorker(px process.Process) {
Dirk McCormick's avatar
Dirk McCormick committed
244
	ticker := time.NewTicker(e.peerSampleInterval)
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
	defer ticker.Stop()

	type update struct {
		peer  peer.ID
		score int
	}
	var (
		lastShortUpdate, lastLongUpdate time.Time
		updates                         []update
	)

	for i := 0; ; i = (i + 1) % longTermRatio {
		var now time.Time
		select {
		case now = <-ticker.C:
260
		case <-px.Closing():
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
			return
		}

		// The long term update ticks every `longTermRatio` short
		// intervals.
		updateLong := i == 0

		e.lock.Lock()
		for _, ledger := range e.ledgerMap {
			ledger.lk.Lock()

			// Update the short-term score.
			if ledger.lastExchange.After(lastShortUpdate) {
				ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha)
			} else {
				ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha)
			}

			// Update the long-term score.
			if updateLong {
				if ledger.lastExchange.After(lastLongUpdate) {
					ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha)
				} else {
					ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha)
				}
			}

			// Calculate the new score.
289 290 291 292
			//
			// The accounting score adjustment prefers peers _we_
			// need over peers that need us. This doesn't help with
			// leeching.
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
			score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75))

			// Avoid updating the connection manager unless there's a change. This can be expensive.
			if ledger.score != score {
				// put these in a list so we can perform the updates outside _global_ the lock.
				updates = append(updates, update{ledger.Partner, score})
				ledger.score = score
			}
			ledger.lk.Unlock()
		}
		e.lock.Unlock()

		// record the times.
		lastShortUpdate = now
		if updateLong {
			lastLongUpdate = now
		}

		// apply the updates
		for _, update := range updates {
			if update.score == 0 {
				e.peerTagger.UntagPeer(update.peer, e.tagUseful)
			} else {
				e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score)
			}
		}
		// Keep the memory. It's not much and it saves us from having to allocate.
		updates = updates[:0]
321 322 323 324 325

		// Used by the tests
		if e.sampleCh != nil {
			e.sampleCh <- struct{}{}
		}
326 327 328
	}
}

329
func (e *Engine) onPeerAdded(p peer.ID) {
330
	e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
331 332 333
}

func (e *Engine) onPeerRemoved(p peer.ID) {
334
	e.peerTagger.UntagPeer(p, e.tagQueued)
335 336
}

337 338
// WantlistForPeer returns the list of keys that the given peer has asked for
func (e *Engine) WantlistForPeer(p peer.ID) []wl.Entry {
339
	partner := e.findOrCreate(p)
340

341
	partner.lk.Lock()
342 343 344 345
	entries := partner.wantList.Entries()
	partner.lk.Unlock()

	wl.SortEntries(entries)
346 347

	return entries
348 349
}

350 351
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
	ledger := e.findOrCreate(p)

	ledger.lk.Lock()
	defer ledger.lk.Unlock()

	return &Receipt{
		Peer:      ledger.Partner.String(),
		Value:     ledger.Accounting.Value(),
		Sent:      ledger.Accounting.BytesSent,
		Recv:      ledger.Accounting.BytesRecv,
		Exchanged: ledger.ExchangeCount(),
	}
}

dirkmc's avatar
dirkmc committed
367 368 369
// Each taskWorker pulls items off the request queue up to the maximum size
// and adds them to an envelope that is passed off to the bitswap workers,
// which send the message to the network.
370
func (e *Engine) taskWorker(ctx context.Context) {
371
	defer e.taskWorkerExit()
372
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
373
		oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
374 375 376 377 378 379 380 381 382 383 384 385
		select {
		case <-ctx.Done():
			return
		case e.outbox <- oneTimeUse:
		}
		// receiver is ready for an outoing envelope. let's prepare one. first,
		// we must acquire a task from the PQ...
		envelope, err := e.nextEnvelope(ctx)
		if err != nil {
			close(oneTimeUse)
			return // ctx cancelled
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
386
		oneTimeUse <- envelope // buffered. won't block
387 388 389 390
		close(oneTimeUse)
	}
}

391 392 393 394 395 396 397 398 399 400 401
// taskWorkerExit handles cleanup of task workers
func (e *Engine) taskWorkerExit() {
	e.taskWorkerLock.Lock()
	defer e.taskWorkerLock.Unlock()

	e.taskWorkerCount--
	if e.taskWorkerCount == 0 {
		close(e.outbox)
	}
}

402 403 404
// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
Jeromy's avatar
Jeromy committed
405
	for {
dirkmc's avatar
dirkmc committed
406 407 408
		// Pop some tasks off the request queue
		p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(targetMessageSize)
		for len(nextTasks) == 0 {
Jeromy's avatar
Jeromy committed
409
			select {
410
			case <-ctx.Done():
411
				return nil, ctx.Err()
412
			case <-e.workSignal:
dirkmc's avatar
dirkmc committed
413
				p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
Jeromy's avatar
Jeromy committed
414
			case <-e.ticker.C:
dirkmc's avatar
dirkmc committed
415 416 417
				// When a task is cancelled, the queue may be "frozen" for a
				// period of time. We periodically "thaw" the queue to make
				// sure it doesn't get stuck in a frozen state.
418
				e.peerRequestQueue.ThawRound()
dirkmc's avatar
dirkmc committed
419
				p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
Jeromy's avatar
Jeromy committed
420 421
			}
		}
422

dirkmc's avatar
dirkmc committed
423
		// Create a new message
424
		msg := bsmsg.New(false)
dirkmc's avatar
dirkmc committed
425

Dirk McCormick's avatar
Dirk McCormick committed
426
		log.Debugw("Bitswap process tasks", "local", e.self, "taskCount", len(nextTasks))
dirkmc's avatar
dirkmc committed
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448

		// Amount of data in the request queue still waiting to be popped
		msg.SetPendingBytes(int32(pendingBytes))

		// Split out want-blocks, want-haves and DONT_HAVEs
		blockCids := make([]cid.Cid, 0, len(nextTasks))
		blockTasks := make(map[cid.Cid]*taskData, len(nextTasks))
		for _, t := range nextTasks {
			c := t.Topic.(cid.Cid)
			td := t.Data.(*taskData)
			if td.HaveBlock {
				if td.IsWantBlock {
					blockCids = append(blockCids, c)
					blockTasks[c] = td
				} else {
					// Add HAVES to the message
					msg.AddHave(c)
				}
			} else {
				// Add DONT_HAVEs to the message
				msg.AddDontHave(c)
			}
449
		}
dirkmc's avatar
dirkmc committed
450 451 452

		// Fetch blocks from datastore
		blks, err := e.bsm.getBlocks(ctx, blockCids)
453 454 455 456
		if err != nil {
			// we're dropping the envelope but that's not an issue in practice.
			return nil, err
		}
457

dirkmc's avatar
dirkmc committed
458 459 460 461 462 463 464 465 466 467
		for c, t := range blockTasks {
			blk := blks[c]
			// If the block was not found (it has been removed)
			if blk == nil {
				// If the client requested DONT_HAVE, add DONT_HAVE to the message
				if t.SendDontHave {
					msg.AddDontHave(c)
				}
			} else {
				// Add the block to the message
Dirk McCormick's avatar
Dirk McCormick committed
468
				// log.Debugf("  make evlp %s->%s block: %s (%d bytes)", e.self, p, c, len(blk.RawData()))
dirkmc's avatar
dirkmc committed
469 470
				msg.AddBlock(blk)
			}
471
		}
472

dirkmc's avatar
dirkmc committed
473
		// If there's nothing in the message, bail out
474
		if msg.Empty() {
dirkmc's avatar
dirkmc committed
475
			e.peerRequestQueue.TasksDone(p, nextTasks...)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
476
			continue
477
		}
478

Dirk McCormick's avatar
Dirk McCormick committed
479
		log.Debugw("Bitswap engine -> msg", "local", e.self, "to", p, "blockCount", len(msg.Blocks()), "presenceCount", len(msg.BlockPresences()), "size", msg.Size())
480
		return &Envelope{
dirkmc's avatar
dirkmc committed
481
			Peer:    p,
482
			Message: msg,
483
			Sent: func() {
dirkmc's avatar
dirkmc committed
484 485 486 487 488 489
				// Once the message has been sent, signal the request queue so
				// it can be cleared from the queue
				e.peerRequestQueue.TasksDone(p, nextTasks...)

				// Signal the worker to check for more work
				e.signalNewWork()
490
			},
491
		}, nil
Jeromy's avatar
Jeromy committed
492 493 494
	}
}

495
// Outbox returns a channel of one-time use Envelope channels.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
496
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
497
	return e.outbox
498 499
}

500
// Peers returns a slice of Peers with whom the local node has active sessions.
501
func (e *Engine) Peers() []peer.ID {
dirkmc's avatar
dirkmc committed
502 503
	e.lock.RLock()
	defer e.lock.RUnlock()
504

505 506
	response := make([]peer.ID, 0, len(e.ledgerMap))

507
	for _, ledger := range e.ledgerMap {
508 509 510 511 512
		response = append(response, ledger.Partner)
	}
	return response
}

dirkmc's avatar
dirkmc committed
513 514 515
// MessageReceived is called when a message is received from a remote peer.
// For each item in the wantlist, add a want-have or want-block entry to the
// request queue (this is later popped off by the workerTasks)
516
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) {
dirkmc's avatar
dirkmc committed
517 518
	entries := m.Wantlist()

Dirk McCormick's avatar
Dirk McCormick committed
519 520 521 522 523 524 525 526 527 528 529 530
	if len(entries) > 0 {
		log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries))
		for _, et := range entries {
			if !et.Cancel {
				if et.WantType == pb.Message_Wantlist_Have {
					log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid)
				} else {
					log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid)
				}
			}
		}
	}
dirkmc's avatar
dirkmc committed
531

532
	if m.Empty() {
Dirk McCormick's avatar
Dirk McCormick committed
533
		log.Infof("received empty message from %s", p)
534 535
	}

536 537 538
	newWorkExists := false
	defer func() {
		if newWorkExists {
539
			e.signalNewWork()
540 541
		}
	}()
542

543
	// Get block sizes
dirkmc's avatar
dirkmc committed
544
	wants, cancels := e.splitWantsCancels(entries)
545
	wantKs := cid.NewSet()
dirkmc's avatar
dirkmc committed
546 547
	for _, entry := range wants {
		wantKs.Add(entry.Cid)
548
	}
549 550 551 552 553
	blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
	if err != nil {
		log.Info("aborting message processing", err)
		return
	}
554

dirkmc's avatar
dirkmc committed
555
	// Get the ledger for the peer
556
	l := e.findOrCreate(p)
Jeromy's avatar
Jeromy committed
557 558
	l.lk.Lock()
	defer l.lk.Unlock()
dirkmc's avatar
dirkmc committed
559 560

	// If the peer sent a full wantlist, replace the ledger's wantlist
561 562 563
	if m.Full() {
		l.wantList = wl.New()
	}
564

565
	var activeEntries []peertask.Task
dirkmc's avatar
dirkmc committed
566 567 568

	// Remove cancelled blocks from the queue
	for _, entry := range cancels {
Dirk McCormick's avatar
Dirk McCormick committed
569
		log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid)
dirkmc's avatar
dirkmc committed
570
		if l.CancelWant(entry.Cid) {
571
			e.peerRequestQueue.Remove(entry.Cid, p)
dirkmc's avatar
dirkmc committed
572 573 574 575 576 577 578 579 580 581 582 583 584
		}
	}

	// For each want-have / want-block
	for _, entry := range wants {
		c := entry.Cid
		blockSize, found := blockSizes[entry.Cid]

		// Add each want-have / want-block to the ledger
		l.Wants(c, entry.Priority, entry.WantType)

		// If the block was not found
		if !found {
Dirk McCormick's avatar
Dirk McCormick committed
585 586
			log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave)

dirkmc's avatar
dirkmc committed
587
			// Only add the task to the queue if the requester wants a DONT_HAVE
588
			if e.sendDontHaves && entry.SendDontHave {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
589
				newWorkExists = true
dirkmc's avatar
dirkmc committed
590 591 592
				isWantBlock := false
				if entry.WantType == pb.Message_Wantlist_Block {
					isWantBlock = true
593
				}
dirkmc's avatar
dirkmc committed
594 595 596

				activeEntries = append(activeEntries, peertask.Task{
					Topic:    c,
597
					Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
					Work:     bsmsg.BlockPresenceSize(c),
					Data: &taskData{
						BlockSize:    0,
						HaveBlock:    false,
						IsWantBlock:  isWantBlock,
						SendDontHave: entry.SendDontHave,
					},
				})
			}
		} else {
			// The block was found, add it to the queue
			newWorkExists = true

			isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

Dirk McCormick's avatar
Dirk McCormick committed
613
			log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", entry.Cid, "isWantBlock", isWantBlock)
dirkmc's avatar
dirkmc committed
614 615 616 617 618 619 620 621

			// entrySize is the amount of space the entry takes up in the
			// message we send to the recipient. If we're sending a block, the
			// entrySize is the size of the block. Otherwise it's the size of
			// a block presence entry.
			entrySize := blockSize
			if !isWantBlock {
				entrySize = bsmsg.BlockPresenceSize(c)
622
			}
dirkmc's avatar
dirkmc committed
623 624
			activeEntries = append(activeEntries, peertask.Task{
				Topic:    c,
625
				Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
626 627 628 629 630 631 632 633
				Work:     entrySize,
				Data: &taskData{
					BlockSize:    blockSize,
					HaveBlock:    true,
					IsWantBlock:  isWantBlock,
					SendDontHave: entry.SendDontHave,
				},
			})
634 635
		}
	}
dirkmc's avatar
dirkmc committed
636 637

	// Push entries onto the request queue
638
	if len(activeEntries) > 0 {
dirkmc's avatar
dirkmc committed
639
		e.peerRequestQueue.PushTasks(p, activeEntries...)
640
	}
dirkmc's avatar
dirkmc committed
641 642 643 644 645 646 647 648 649 650 651 652
}

// Split the want-have / want-block entries from the cancel entries
func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
	wants := make([]bsmsg.Entry, 0, len(es))
	cancels := make([]bsmsg.Entry, 0, len(es))
	for _, et := range es {
		if et.Cancel {
			cancels = append(cancels, et)
		} else {
			wants = append(wants, et)
		}
653
	}
dirkmc's avatar
dirkmc committed
654
	return wants, cancels
655 656
}

dirkmc's avatar
dirkmc committed
657 658 659
// ReceiveFrom is called when new blocks are received and added to the block
// store, meaning there may be peers who want those blocks, so we should send
// the blocks to them.
660 661
//
// This function also updates the receive side of the ledger.
dirkmc's avatar
dirkmc committed
662 663 664 665 666
func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) {
	if len(blks) == 0 {
		return
	}

667 668 669 670 671 672 673 674 675 676 677 678 679
	if from != "" {
		l := e.findOrCreate(from)
		l.lk.Lock()

		// Record how many bytes were received in the ledger
		for _, blk := range blks {
			log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData()))
			l.ReceivedBytes(len(blk.RawData()))
		}

		l.lk.Unlock()
	}

dirkmc's avatar
dirkmc committed
680 681 682 683 684
	// Get the size of each block
	blockSizes := make(map[cid.Cid]int, len(blks))
	for _, blk := range blks {
		blockSizes[blk.Cid()] = len(blk.RawData())
	}
685

dirkmc's avatar
dirkmc committed
686 687 688
	// Check each peer to see if it wants one of the blocks we received
	work := false
	e.lock.RLock()
689

690
	for _, l := range e.ledgerMap {
dirkmc's avatar
dirkmc committed
691 692 693 694 695
		l.lk.RLock()

		for _, b := range blks {
			k := b.Cid()

696
			if entry, ok := l.WantListContains(k); ok {
697
				work = true
dirkmc's avatar
dirkmc committed
698 699 700 701 702 703 704 705 706 707 708

				blockSize := blockSizes[k]
				isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

				entrySize := blockSize
				if !isWantBlock {
					entrySize = bsmsg.BlockPresenceSize(k)
				}

				e.peerRequestQueue.PushTasks(l.Partner, peertask.Task{
					Topic:    entry.Cid,
709
					Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
710 711 712 713 714 715 716 717
					Work:     entrySize,
					Data: &taskData{
						BlockSize:    blockSize,
						HaveBlock:    true,
						IsWantBlock:  isWantBlock,
						SendDontHave: false,
					},
				})
718
			}
719
		}
dirkmc's avatar
dirkmc committed
720
		l.lk.RUnlock()
721
	}
dirkmc's avatar
dirkmc committed
722
	e.lock.RUnlock()
723 724 725 726 727 728

	if work {
		e.signalNewWork()
	}
}

729 730 731 732 733 734
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

735 736
// MessageSent is called when a message has successfully been sent out, to record
// changes.
737
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
738
	l := e.findOrCreate(p)
739 740 741
	l.lk.Lock()
	defer l.lk.Unlock()

dirkmc's avatar
dirkmc committed
742
	// Remove sent blocks from the want list for the peer
743
	for _, block := range m.Blocks() {
Jeromy's avatar
Jeromy committed
744
		l.SentBytes(len(block.RawData()))
dirkmc's avatar
dirkmc committed
745 746 747 748 749
		l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block)
	}

	// Remove sent block presences from the want list for the peer
	for _, bp := range m.BlockPresences() {
750
		// Don't record sent data. We reserve that for data blocks.
dirkmc's avatar
dirkmc committed
751 752 753
		if bp.Type == pb.Message_Have {
			l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have)
		}
754 755 756
	}
}

757 758
// PeerConnected is called when a new peer connects, meaning we should start
// sending blocks.
759 760
func (e *Engine) PeerConnected(p peer.ID) {
	e.lock.Lock()
761
	defer e.lock.Unlock()
762 763

	_, ok := e.ledgerMap[p]
764
	if !ok {
765
		e.ledgerMap[p] = newLedger(p)
766 767 768
	}
}

769
// PeerDisconnected is called when a peer disconnects.
770
func (e *Engine) PeerDisconnected(p peer.ID) {
771 772
	e.lock.Lock()
	defer e.lock.Unlock()
dirkmc's avatar
dirkmc committed
773

774
	delete(e.ledgerMap, p)
775 776
}

dirkmc's avatar
dirkmc committed
777 778 779 780 781 782 783
// If the want is a want-have, and it's below a certain size, send the full
// block (instead of sending a HAVE)
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
	isWantBlock := wantType == pb.Message_Wantlist_Block
	return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
}

784
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
785
	// NB not threadsafe
786
	return e.findOrCreate(p).Accounting.BytesSent
787 788
}

789
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
790
	// NB not threadsafe
791
	return e.findOrCreate(p).Accounting.BytesRecv
792 793 794
}

// ledger lazily instantiates a ledger
795
func (e *Engine) findOrCreate(p peer.ID) *ledger {
dirkmc's avatar
dirkmc committed
796 797 798 799 800 801 802 803 804 805 806
	// Take a read lock (as it's less expensive) to check if we have a ledger
	// for the peer
	e.lock.RLock()
	l, ok := e.ledgerMap[p]
	e.lock.RUnlock()
	if ok {
		return l
	}

	// There's no ledger, so take a write lock, then check again and create the
	// ledger if necessary
Jeromy's avatar
Jeromy committed
807
	e.lock.Lock()
808
	defer e.lock.Unlock()
dirkmc's avatar
dirkmc committed
809
	l, ok = e.ledgerMap[p]
810 811
	if !ok {
		l = newLedger(p)
812
		e.ledgerMap[p] = l
813 814 815
	}
	return l
}
816 817 818 819 820 821 822 823

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}