engine.go 24.9 KB
Newer Older
1
// Package decision implements the decision engine for the bitswap service.
2
package decision
3 4

import (
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

10
	"github.com/google/uuid"
dirkmc's avatar
dirkmc committed
11

Jeromy's avatar
Jeromy committed
12
	bsmsg "github.com/ipfs/go-bitswap/message"
dirkmc's avatar
dirkmc committed
13
	pb "github.com/ipfs/go-bitswap/message/pb"
Jeromy's avatar
Jeromy committed
14
	wl "github.com/ipfs/go-bitswap/wantlist"
dirkmc's avatar
dirkmc committed
15
	blocks "github.com/ipfs/go-block-format"
16
	cid "github.com/ipfs/go-cid"
Jeromy's avatar
Jeromy committed
17 18
	bstore "github.com/ipfs/go-ipfs-blockstore"
	logging "github.com/ipfs/go-log"
19 20
	"github.com/ipfs/go-peertaskqueue"
	"github.com/ipfs/go-peertaskqueue/peertask"
21
	process "github.com/jbenet/goprocess"
Raúl Kripalani's avatar
Raúl Kripalani committed
22
	peer "github.com/libp2p/go-libp2p-core/peer"
23 24
)

25 26 27 28 29 30 31 32
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
33 34
// 1. an initial `sendwantlist` message to a provider of the first key in a
//    request
35 36 37 38 39 40 41 42 43 44 45 46
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
47 48
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as
//   appropriate
49
// * when handling `cancel`, if we recently received a wanted block from a
50
//   peer, include a partial wantlist that contains a few other high priority
51 52 53 54 55 56
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

Jeromy's avatar
Jeromy committed
57
var log = logging.Logger("engine")
58

Brian Tiger Chow's avatar
Brian Tiger Chow committed
59
const (
60 61
	// outboxChanBuffer must be 0 to prevent stale messages from being sent
	outboxChanBuffer = 0
dirkmc's avatar
dirkmc committed
62 63 64 65
	// targetMessageSize is the ideal size of the batched payload. We try to
	// pop this much data off the request queue, but it may be a little more
	// or less depending on what's in the queue.
	targetMessageSize = 16 * 1024
66 67
	// tagFormat is the tag given to peers associated an engine
	tagFormat = "bs-engine-%s-%s"
68

69 70 71 72 73 74 75 76 77 78
	// queuedTagWeight is the default weight for peers that have work queued
	// on their behalf.
	queuedTagWeight = 10

	// the alpha for the EWMA used to track short term usefulness
	shortTermAlpha = 0.5

	// the alpha for the EWMA used to track long term usefulness
	longTermAlpha = 0.05

Dirk McCormick's avatar
Dirk McCormick committed
79 80 81 82
	// how frequently the engine should sample usefulness. Peers that
	// interact every shortTerm time period are considered "active".
	shortTerm = 10 * time.Second

83 84 85 86 87 88 89 90
	// long term ratio defines what "long term" means in terms of the
	// shortTerm duration. Peers that interact once every longTermRatio are
	// considered useful over the long term.
	longTermRatio = 10

	// long/short term scores for tagging peers
	longTermScore  = 10 // this is a high tag but it grows _very_ slowly.
	shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
91

dirkmc's avatar
dirkmc committed
92 93 94 95 96 97 98
	// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
	// bytes up to which we will replace a want-have with a want-block
	maxBlockSizeReplaceHasWithBlock = 1024

	// Number of concurrent workers that pull tasks off the request queue
	taskWorkerCount = 8

99 100
	// Number of concurrent workers that process requests to the blockstore
	blockstoreWorkerCount = 128
101 102
)

103
// Envelope contains a message for a Peer.
104
type Envelope struct {
105
	// Peer is the intended recipient.
106
	Peer peer.ID
107

108
	// Message is the payload.
109
	Message bsmsg.BitSwapMessage
Jeromy's avatar
Jeromy committed
110 111 112

	// A callback to notify the decision queue that the task is complete
	Sent func()
113 114
}

115 116 117 118 119 120 121
// PeerTagger covers the methods on the connection manager used by the decision
// engine to tag peers
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

122
// Engine manages sending requested blocks to peers.
123
type Engine struct {
124 125 126
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
127
	peerRequestQueue *peertaskqueue.PeerTaskQueue
128

129 130 131 132 133
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
134
	workSignal chan struct{}
135

136 137
	// outbox contains outgoing messages to peers. This is owned by the
	// taskWorker goroutine
Brian Tiger Chow's avatar
Brian Tiger Chow committed
138
	outbox chan (<-chan *Envelope)
139

140
	bsm *blockstoreManager
141

142 143
	peerTagger PeerTagger

144 145
	tagQueued, tagUseful string

dirkmc's avatar
dirkmc committed
146 147
	lock sync.RWMutex // protects the fields immediatly below

148
	// ledgerMap lists Ledgers by their Partner key.
149
	ledgerMap map[peer.ID]*ledger
Jeromy's avatar
Jeromy committed
150 151

	ticker *time.Ticker
152 153 154

	taskWorkerLock  sync.Mutex
	taskWorkerCount int
dirkmc's avatar
dirkmc committed
155 156 157 158 159

	// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
	// bytes up to which we will replace a want-have with a want-block
	maxBlockSizeReplaceHasWithBlock int

Dirk McCormick's avatar
Dirk McCormick committed
160 161 162
	// how frequently the engine should sample peer usefulness
	peerSampleInterval time.Duration

163 164
	sendDontHaves bool

dirkmc's avatar
dirkmc committed
165
	self peer.ID
166 167
}

168
// NewEngine creates a new block sending engine for the given block store
dirkmc's avatar
dirkmc committed
169
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
Dirk McCormick's avatar
Dirk McCormick committed
170
	return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm)
dirkmc's avatar
dirkmc committed
171 172 173
}

// This constructor is used by the tests
Dirk McCormick's avatar
Dirk McCormick committed
174 175 176
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
	maxReplaceSize int, peerSampleInterval time.Duration) *Engine {

177
	e := &Engine{
dirkmc's avatar
dirkmc committed
178 179 180 181 182 183 184
		ledgerMap:                       make(map[peer.ID]*ledger),
		bsm:                             newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
		peerTagger:                      peerTagger,
		outbox:                          make(chan (<-chan *Envelope), outboxChanBuffer),
		workSignal:                      make(chan struct{}, 1),
		ticker:                          time.NewTicker(time.Millisecond * 100),
		maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
Dirk McCormick's avatar
Dirk McCormick committed
185
		peerSampleInterval:              peerSampleInterval,
dirkmc's avatar
dirkmc committed
186
		taskWorkerCount:                 taskWorkerCount,
187
		sendDontHaves:                   true,
dirkmc's avatar
dirkmc committed
188
		self:                            self,
189
	}
190 191
	e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
	e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
192 193
	e.peerRequestQueue = peertaskqueue.New(
		peertaskqueue.OnPeerAddedHook(e.onPeerAdded),
dirkmc's avatar
dirkmc committed
194 195 196
		peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved),
		peertaskqueue.TaskMerger(newTaskMerger()),
		peertaskqueue.IgnoreFreezing(true))
197
	go e.scoreWorker(ctx)
198
	return e
Jeromy's avatar
Jeromy committed
199 200
}

201 202 203 204 205 206 207 208 209 210
// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
// - Simply don't respond
// Older versions of Bitswap did not respond, so this allows us to simulate
// those older versions for testing.
func (e *Engine) SetSendDontHaves(send bool) {
	e.sendDontHaves = send
}

211 212 213 214 215 216 217 218 219 220 221 222
// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
	// Start up blockstore manager
	e.bsm.start(px)

	for i := 0; i < e.taskWorkerCount; i++ {
		px.Go(func(px process.Process) {
			e.taskWorker(ctx)
		})
	}
}

223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
238 239
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
240
func (e *Engine) scoreWorker(ctx context.Context) {
Dirk McCormick's avatar
Dirk McCormick committed
241
	ticker := time.NewTicker(e.peerSampleInterval)
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
	defer ticker.Stop()

	type update struct {
		peer  peer.ID
		score int
	}
	var (
		lastShortUpdate, lastLongUpdate time.Time
		updates                         []update
	)

	for i := 0; ; i = (i + 1) % longTermRatio {
		var now time.Time
		select {
		case now = <-ticker.C:
		case <-ctx.Done():
			return
		}

		// The long term update ticks every `longTermRatio` short
		// intervals.
		updateLong := i == 0

		e.lock.Lock()
		for _, ledger := range e.ledgerMap {
			ledger.lk.Lock()

			// Update the short-term score.
			if ledger.lastExchange.After(lastShortUpdate) {
				ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha)
			} else {
				ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha)
			}

			// Update the long-term score.
			if updateLong {
				if ledger.lastExchange.After(lastLongUpdate) {
					ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha)
				} else {
					ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha)
				}
			}

			// Calculate the new score.
286 287 288 289
			//
			// The accounting score adjustment prefers peers _we_
			// need over peers that need us. This doesn't help with
			// leeching.
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
			score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75))

			// Avoid updating the connection manager unless there's a change. This can be expensive.
			if ledger.score != score {
				// put these in a list so we can perform the updates outside _global_ the lock.
				updates = append(updates, update{ledger.Partner, score})
				ledger.score = score
			}
			ledger.lk.Unlock()
		}
		e.lock.Unlock()

		// record the times.
		lastShortUpdate = now
		if updateLong {
			lastLongUpdate = now
		}

		// apply the updates
		for _, update := range updates {
			if update.score == 0 {
				e.peerTagger.UntagPeer(update.peer, e.tagUseful)
			} else {
				e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score)
			}
		}
		// Keep the memory. It's not much and it saves us from having to allocate.
		updates = updates[:0]
	}
}

321
func (e *Engine) onPeerAdded(p peer.ID) {
322
	e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
323 324 325
}

func (e *Engine) onPeerRemoved(p peer.ID) {
326
	e.peerTagger.UntagPeer(p, e.tagQueued)
327 328
}

329
// WantlistForPeer returns the currently understood want list for a given peer
330
func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) {
331 332 333 334
	partner := e.findOrCreate(p)
	partner.lk.Lock()
	defer partner.lk.Unlock()
	return partner.wantList.SortedEntries()
335 336
}

337 338
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
	ledger := e.findOrCreate(p)

	ledger.lk.Lock()
	defer ledger.lk.Unlock()

	return &Receipt{
		Peer:      ledger.Partner.String(),
		Value:     ledger.Accounting.Value(),
		Sent:      ledger.Accounting.BytesSent,
		Recv:      ledger.Accounting.BytesRecv,
		Exchanged: ledger.ExchangeCount(),
	}
}

dirkmc's avatar
dirkmc committed
354 355 356
// Each taskWorker pulls items off the request queue up to the maximum size
// and adds them to an envelope that is passed off to the bitswap workers,
// which send the message to the network.
357
func (e *Engine) taskWorker(ctx context.Context) {
358
	defer e.taskWorkerExit()
359
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
360
		oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
361 362 363 364 365 366 367 368 369 370 371 372
		select {
		case <-ctx.Done():
			return
		case e.outbox <- oneTimeUse:
		}
		// receiver is ready for an outoing envelope. let's prepare one. first,
		// we must acquire a task from the PQ...
		envelope, err := e.nextEnvelope(ctx)
		if err != nil {
			close(oneTimeUse)
			return // ctx cancelled
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
373
		oneTimeUse <- envelope // buffered. won't block
374 375 376 377
		close(oneTimeUse)
	}
}

378 379 380 381 382 383 384 385 386 387 388
// taskWorkerExit handles cleanup of task workers
func (e *Engine) taskWorkerExit() {
	e.taskWorkerLock.Lock()
	defer e.taskWorkerLock.Unlock()

	e.taskWorkerCount--
	if e.taskWorkerCount == 0 {
		close(e.outbox)
	}
}

389 390 391
// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
Jeromy's avatar
Jeromy committed
392
	for {
dirkmc's avatar
dirkmc committed
393 394 395
		// Pop some tasks off the request queue
		p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(targetMessageSize)
		for len(nextTasks) == 0 {
Jeromy's avatar
Jeromy committed
396
			select {
397
			case <-ctx.Done():
398
				return nil, ctx.Err()
399
			case <-e.workSignal:
dirkmc's avatar
dirkmc committed
400
				p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
Jeromy's avatar
Jeromy committed
401
			case <-e.ticker.C:
dirkmc's avatar
dirkmc committed
402 403 404
				// When a task is cancelled, the queue may be "frozen" for a
				// period of time. We periodically "thaw" the queue to make
				// sure it doesn't get stuck in a frozen state.
405
				e.peerRequestQueue.ThawRound()
dirkmc's avatar
dirkmc committed
406
				p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
Jeromy's avatar
Jeromy committed
407 408
			}
		}
409

dirkmc's avatar
dirkmc committed
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
		// Create a new message
		msg := bsmsg.New(true)

		// log.Debugf("  %s got %d tasks", lu.P(e.self), len(nextTasks))

		// Amount of data in the request queue still waiting to be popped
		msg.SetPendingBytes(int32(pendingBytes))

		// Split out want-blocks, want-haves and DONT_HAVEs
		blockCids := make([]cid.Cid, 0, len(nextTasks))
		blockTasks := make(map[cid.Cid]*taskData, len(nextTasks))
		for _, t := range nextTasks {
			c := t.Topic.(cid.Cid)
			td := t.Data.(*taskData)
			if td.HaveBlock {
				if td.IsWantBlock {
					blockCids = append(blockCids, c)
					blockTasks[c] = td
				} else {
					// Add HAVES to the message
					msg.AddHave(c)
				}
			} else {
				// Add DONT_HAVEs to the message
				msg.AddDontHave(c)
			}
436
		}
dirkmc's avatar
dirkmc committed
437 438 439

		// Fetch blocks from datastore
		blks, err := e.bsm.getBlocks(ctx, blockCids)
440 441 442 443
		if err != nil {
			// we're dropping the envelope but that's not an issue in practice.
			return nil, err
		}
444

dirkmc's avatar
dirkmc committed
445 446 447 448 449 450 451 452 453 454 455 456 457 458
		for c, t := range blockTasks {
			blk := blks[c]
			// If the block was not found (it has been removed)
			if blk == nil {
				// If the client requested DONT_HAVE, add DONT_HAVE to the message
				if t.SendDontHave {
					// log.Debugf("  make evlp %s->%s DONT_HAVE (expected block) %s", lu.P(e.self), lu.P(p), lu.C(c))
					msg.AddDontHave(c)
				}
			} else {
				// Add the block to the message
				// log.Debugf("  make evlp %s->%s block: %s (%d bytes)", lu.P(e.self), lu.P(p), lu.C(c), len(blk.RawData()))
				msg.AddBlock(blk)
			}
459
		}
460

dirkmc's avatar
dirkmc committed
461
		// If there's nothing in the message, bail out
462
		if msg.Empty() {
dirkmc's avatar
dirkmc committed
463
			e.peerRequestQueue.TasksDone(p, nextTasks...)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
464
			continue
465
		}
466

dirkmc's avatar
dirkmc committed
467
		// log.Debugf("  sending message %s->%s (%d blks / %d presences / %d bytes)\n", lu.P(e.self), lu.P(p), blkCount, presenceCount, msg.Size())
468
		return &Envelope{
dirkmc's avatar
dirkmc committed
469
			Peer:    p,
470
			Message: msg,
471
			Sent: func() {
dirkmc's avatar
dirkmc committed
472 473 474 475 476 477
				// Once the message has been sent, signal the request queue so
				// it can be cleared from the queue
				e.peerRequestQueue.TasksDone(p, nextTasks...)

				// Signal the worker to check for more work
				e.signalNewWork()
478
			},
479
		}, nil
Jeromy's avatar
Jeromy committed
480 481 482
	}
}

483
// Outbox returns a channel of one-time use Envelope channels.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
484
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
485
	return e.outbox
486 487
}

488
// Peers returns a slice of Peers with whom the local node has active sessions.
489
func (e *Engine) Peers() []peer.ID {
dirkmc's avatar
dirkmc committed
490 491
	e.lock.RLock()
	defer e.lock.RUnlock()
492

493 494
	response := make([]peer.ID, 0, len(e.ledgerMap))

495
	for _, ledger := range e.ledgerMap {
496 497 498 499 500
		response = append(response, ledger.Partner)
	}
	return response
}

dirkmc's avatar
dirkmc committed
501 502 503
// MessageReceived is called when a message is received from a remote peer.
// For each item in the wantlist, add a want-have or want-block entry to the
// request queue (this is later popped off by the workerTasks)
504
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) {
dirkmc's avatar
dirkmc committed
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
	entries := m.Wantlist()

	// if len(entries) > 0 {
	// 	log.Debugf("engine-%s received message from %s with %d entries\n", lu.P(e.self), lu.P(p), len(entries))
	// 	for _, et := range entries {
	// 		if !et.Cancel {
	// 			if et.WantType == pb.Message_Wantlist_Have {
	// 				log.Debugf("  recv %s<-%s: want-have %s\n", lu.P(e.self), lu.P(p), lu.C(et.Cid))
	// 			} else {
	// 				log.Debugf("  recv %s<-%s: want-block %s\n", lu.P(e.self), lu.P(p), lu.C(et.Cid))
	// 			}
	// 		}
	// 	}
	// }

520
	if m.Empty() {
521
		log.Debugf("received empty message from %s", p)
522 523
	}

524 525 526
	newWorkExists := false
	defer func() {
		if newWorkExists {
527
			e.signalNewWork()
528 529
		}
	}()
530

531
	// Get block sizes
dirkmc's avatar
dirkmc committed
532
	wants, cancels := e.splitWantsCancels(entries)
533
	wantKs := cid.NewSet()
dirkmc's avatar
dirkmc committed
534 535
	for _, entry := range wants {
		wantKs.Add(entry.Cid)
536
	}
537 538 539 540 541
	blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
	if err != nil {
		log.Info("aborting message processing", err)
		return
	}
542

dirkmc's avatar
dirkmc committed
543
	// Get the ledger for the peer
544
	l := e.findOrCreate(p)
Jeromy's avatar
Jeromy committed
545 546
	l.lk.Lock()
	defer l.lk.Unlock()
dirkmc's avatar
dirkmc committed
547 548 549 550 551 552 553 554 555

	// Record how many bytes were received in the ledger
	blks := m.Blocks()
	for _, block := range blks {
		log.Debugf("got block %s %d bytes", block, len(block.RawData()))
		l.ReceivedBytes(len(block.RawData()))
	}

	// If the peer sent a full wantlist, replace the ledger's wantlist
556 557 558
	if m.Full() {
		l.wantList = wl.New()
	}
559

560
	var activeEntries []peertask.Task
dirkmc's avatar
dirkmc committed
561 562 563 564 565

	// Remove cancelled blocks from the queue
	for _, entry := range cancels {
		// log.Debugf("%s<-%s cancel %s", lu.P(e.self), lu.P(p), lu.C(entry.Cid))
		if l.CancelWant(entry.Cid) {
566
			e.peerRequestQueue.Remove(entry.Cid, p)
dirkmc's avatar
dirkmc committed
567 568 569 570 571 572 573 574 575 576 577 578 579 580
		}
	}

	// For each want-have / want-block
	for _, entry := range wants {
		c := entry.Cid
		blockSize, found := blockSizes[entry.Cid]

		// Add each want-have / want-block to the ledger
		l.Wants(c, entry.Priority, entry.WantType)

		// If the block was not found
		if !found {
			// Only add the task to the queue if the requester wants a DONT_HAVE
581
			if e.sendDontHaves && entry.SendDontHave {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
582
				newWorkExists = true
dirkmc's avatar
dirkmc committed
583 584 585
				isWantBlock := false
				if entry.WantType == pb.Message_Wantlist_Block {
					isWantBlock = true
586
				}
dirkmc's avatar
dirkmc committed
587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625

				// if isWantBlock {
				// 	log.Debugf("  put rq %s->%s %s as want-block (not found)\n", lu.P(e.self), lu.P(p), lu.C(entry.Cid))
				// } else {
				// 	log.Debugf("  put rq %s->%s %s as want-have (not found)\n", lu.P(e.self), lu.P(p), lu.C(entry.Cid))
				// }

				activeEntries = append(activeEntries, peertask.Task{
					Topic:    c,
					Priority: entry.Priority,
					Work:     bsmsg.BlockPresenceSize(c),
					Data: &taskData{
						BlockSize:    0,
						HaveBlock:    false,
						IsWantBlock:  isWantBlock,
						SendDontHave: entry.SendDontHave,
					},
				})
			}
			// log.Debugf("  not putting rq %s->%s %s (not found, SendDontHave false)\n", lu.P(e.self), lu.P(p), lu.C(entry.Cid))
		} else {
			// The block was found, add it to the queue
			newWorkExists = true

			isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

			// if isWantBlock {
			// 	log.Debugf("  put rq %s->%s %s as want-block (%d bytes)\n", lu.P(e.self), lu.P(p), lu.C(entry.Cid), blockSize)
			// } else {
			// 	log.Debugf("  put rq %s->%s %s as want-have (%d bytes)\n", lu.P(e.self), lu.P(p), lu.C(entry.Cid), blockSize)
			// }

			// entrySize is the amount of space the entry takes up in the
			// message we send to the recipient. If we're sending a block, the
			// entrySize is the size of the block. Otherwise it's the size of
			// a block presence entry.
			entrySize := blockSize
			if !isWantBlock {
				entrySize = bsmsg.BlockPresenceSize(c)
626
			}
dirkmc's avatar
dirkmc committed
627 628 629 630 631 632 633 634 635 636 637
			activeEntries = append(activeEntries, peertask.Task{
				Topic:    c,
				Priority: entry.Priority,
				Work:     entrySize,
				Data: &taskData{
					BlockSize:    blockSize,
					HaveBlock:    true,
					IsWantBlock:  isWantBlock,
					SendDontHave: entry.SendDontHave,
				},
			})
638 639
		}
	}
dirkmc's avatar
dirkmc committed
640 641

	// Push entries onto the request queue
642
	if len(activeEntries) > 0 {
dirkmc's avatar
dirkmc committed
643
		e.peerRequestQueue.PushTasks(p, activeEntries...)
644
	}
dirkmc's avatar
dirkmc committed
645 646 647 648 649 650 651 652 653 654 655 656
}

// Split the want-have / want-block entries from the cancel entries
func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
	wants := make([]bsmsg.Entry, 0, len(es))
	cancels := make([]bsmsg.Entry, 0, len(es))
	for _, et := range es {
		if et.Cancel {
			cancels = append(cancels, et)
		} else {
			wants = append(wants, et)
		}
657
	}
dirkmc's avatar
dirkmc committed
658
	return wants, cancels
659 660
}

dirkmc's avatar
dirkmc committed
661 662 663 664 665 666 667 668 669 670 671 672 673
// ReceiveFrom is called when new blocks are received and added to the block
// store, meaning there may be peers who want those blocks, so we should send
// the blocks to them.
func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) {
	if len(blks) == 0 {
		return
	}

	// Get the size of each block
	blockSizes := make(map[cid.Cid]int, len(blks))
	for _, blk := range blks {
		blockSizes[blk.Cid()] = len(blk.RawData())
	}
674

dirkmc's avatar
dirkmc committed
675 676 677
	// Check each peer to see if it wants one of the blocks we received
	work := false
	e.lock.RLock()
678
	for _, l := range e.ledgerMap {
dirkmc's avatar
dirkmc committed
679 680 681 682 683
		l.lk.RLock()

		for _, b := range blks {
			k := b.Cid()

684
			if entry, ok := l.WantListContains(k); ok {
685
				work = true
dirkmc's avatar
dirkmc committed
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711

				blockSize := blockSizes[k]
				isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

				// if isWantBlock {
				// 	log.Debugf("  add-block put rq %s->%s %s as want-block (%d bytes)\n", lu.P(e.self), lu.P(l.Partner), lu.C(k), blockSize)
				// } else {
				// 	log.Debugf("  add-block put rq %s->%s %s as want-have (%d bytes)\n", lu.P(e.self), lu.P(l.Partner), lu.C(k), blockSize)
				// }

				entrySize := blockSize
				if !isWantBlock {
					entrySize = bsmsg.BlockPresenceSize(k)
				}

				e.peerRequestQueue.PushTasks(l.Partner, peertask.Task{
					Topic:    entry.Cid,
					Priority: entry.Priority,
					Work:     entrySize,
					Data: &taskData{
						BlockSize:    blockSize,
						HaveBlock:    true,
						IsWantBlock:  isWantBlock,
						SendDontHave: false,
					},
				})
712
			}
713
		}
dirkmc's avatar
dirkmc committed
714
		l.lk.RUnlock()
715
	}
dirkmc's avatar
dirkmc committed
716
	e.lock.RUnlock()
717 718 719 720 721 722

	if work {
		e.signalNewWork()
	}
}

723 724 725 726 727 728
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

729 730
// MessageSent is called when a message has successfully been sent out, to record
// changes.
731
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
732
	l := e.findOrCreate(p)
733 734 735
	l.lk.Lock()
	defer l.lk.Unlock()

dirkmc's avatar
dirkmc committed
736
	// Remove sent blocks from the want list for the peer
737
	for _, block := range m.Blocks() {
Jeromy's avatar
Jeromy committed
738
		l.SentBytes(len(block.RawData()))
dirkmc's avatar
dirkmc committed
739 740 741 742 743 744 745 746 747 748
		l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block)
	}

	// Remove sent block presences from the want list for the peer
	for _, bp := range m.BlockPresences() {
		// TODO: record block presence bytes as well?
		// l.SentBytes(?)
		if bp.Type == pb.Message_Have {
			l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have)
		}
749 750 751
	}
}

752 753
// PeerConnected is called when a new peer connects, meaning we should start
// sending blocks.
754 755
func (e *Engine) PeerConnected(p peer.ID) {
	e.lock.Lock()
756
	defer e.lock.Unlock()
757 758 759 760 761
	l, ok := e.ledgerMap[p]
	if !ok {
		l = newLedger(p)
		e.ledgerMap[p] = l
	}
dirkmc's avatar
dirkmc committed
762

763
	l.lk.Lock()
764
	defer l.lk.Unlock()
765 766 767
	l.ref++
}

768
// PeerDisconnected is called when a peer disconnects.
769
func (e *Engine) PeerDisconnected(p peer.ID) {
770 771 772 773 774 775
	e.lock.Lock()
	defer e.lock.Unlock()
	l, ok := e.ledgerMap[p]
	if !ok {
		return
	}
dirkmc's avatar
dirkmc committed
776

777
	l.lk.Lock()
778
	defer l.lk.Unlock()
779 780 781 782
	l.ref--
	if l.ref <= 0 {
		delete(e.ledgerMap, p)
	}
783 784
}

dirkmc's avatar
dirkmc committed
785 786 787 788 789 790 791
// If the want is a want-have, and it's below a certain size, send the full
// block (instead of sending a HAVE)
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
	isWantBlock := wantType == pb.Message_Wantlist_Block
	return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
}

792
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
793
	// NB not threadsafe
794
	return e.findOrCreate(p).Accounting.BytesSent
795 796
}

797
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
798
	// NB not threadsafe
799
	return e.findOrCreate(p).Accounting.BytesRecv
800 801 802
}

// ledger lazily instantiates a ledger
803
func (e *Engine) findOrCreate(p peer.ID) *ledger {
dirkmc's avatar
dirkmc committed
804 805 806 807 808 809 810 811 812 813 814
	// Take a read lock (as it's less expensive) to check if we have a ledger
	// for the peer
	e.lock.RLock()
	l, ok := e.ledgerMap[p]
	e.lock.RUnlock()
	if ok {
		return l
	}

	// There's no ledger, so take a write lock, then check again and create the
	// ledger if necessary
Jeromy's avatar
Jeromy committed
815
	e.lock.Lock()
816
	defer e.lock.Unlock()
dirkmc's avatar
dirkmc committed
817
	l, ok = e.ledgerMap[p]
818 819
	if !ok {
		l = newLedger(p)
820
		e.ledgerMap[p] = l
821 822 823
	}
	return l
}
824 825 826 827 828 829 830 831

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}