engine.go 15.6 KB
Newer Older
1
// Package decision implements the decision engine for the bitswap service.
2
package decision
3 4

import (
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

10
	"github.com/google/uuid"
Jeromy's avatar
Jeromy committed
11 12
	bsmsg "github.com/ipfs/go-bitswap/message"
	wl "github.com/ipfs/go-bitswap/wantlist"
13
	cid "github.com/ipfs/go-cid"
Jeromy's avatar
Jeromy committed
14 15
	bstore "github.com/ipfs/go-ipfs-blockstore"
	logging "github.com/ipfs/go-log"
16 17
	"github.com/ipfs/go-peertaskqueue"
	"github.com/ipfs/go-peertaskqueue/peertask"
Raúl Kripalani's avatar
Raúl Kripalani committed
18
	peer "github.com/libp2p/go-libp2p-core/peer"
19 20
)

21 22 23 24 25 26 27 28
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
29 30
// 1. an initial `sendwantlist` message to a provider of the first key in a
//    request
31 32 33 34 35 36 37 38 39 40 41 42
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
43 44
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as
//   appropriate
45
// * when handling `cancel`, if we recently received a wanted block from a
46
//   peer, include a partial wantlist that contains a few other high priority
47 48 49 50 51 52
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

Jeromy's avatar
Jeromy committed
53
var log = logging.Logger("engine")
54

Brian Tiger Chow's avatar
Brian Tiger Chow committed
55
const (
56 57
	// outboxChanBuffer must be 0 to prevent stale messages from being sent
	outboxChanBuffer = 0
58 59
	// maxMessageSize is the maximum size of the batched payload
	maxMessageSize = 512 * 1024
60 61
	// tagFormat is the tag given to peers associated an engine
	tagFormat = "bs-engine-%s-%s"
62

63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
	// queuedTagWeight is the default weight for peers that have work queued
	// on their behalf.
	queuedTagWeight = 10

	// the alpha for the EWMA used to track short term usefulness
	shortTermAlpha = 0.5

	// the alpha for the EWMA used to track long term usefulness
	longTermAlpha = 0.05

	// long term ratio defines what "long term" means in terms of the
	// shortTerm duration. Peers that interact once every longTermRatio are
	// considered useful over the long term.
	longTermRatio = 10

	// long/short term scores for tagging peers
	longTermScore  = 10 // this is a high tag but it grows _very_ slowly.
	shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
)

var (
	// how frequently the engine should sample usefulness. Peers that
	// interact every shortTerm time period are considered "active".
	//
	// this is only a variable to make testing easier.
	shortTerm = 10 * time.Second
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89 90
)

91
// Envelope contains a message for a Peer.
92
type Envelope struct {
93
	// Peer is the intended recipient.
94
	Peer peer.ID
95

96
	// Message is the payload.
97
	Message bsmsg.BitSwapMessage
Jeromy's avatar
Jeromy committed
98 99 100

	// A callback to notify the decision queue that the task is complete
	Sent func()
101 102
}

103 104 105 106 107 108 109
// PeerTagger covers the methods on the connection manager used by the decision
// engine to tag peers
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

110
// Engine manages sending requested blocks to peers.
111
type Engine struct {
112 113 114
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
115
	peerRequestQueue *peertaskqueue.PeerTaskQueue
116

117 118 119 120 121
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
122
	workSignal chan struct{}
123

124 125
	// outbox contains outgoing messages to peers. This is owned by the
	// taskWorker goroutine
Brian Tiger Chow's avatar
Brian Tiger Chow committed
126
	outbox chan (<-chan *Envelope)
127 128 129

	bs bstore.Blockstore

130 131
	peerTagger PeerTagger

132 133
	tagQueued, tagUseful string

134
	lock sync.Mutex // protects the fields immediatly below
135
	// ledgerMap lists Ledgers by their Partner key.
136
	ledgerMap map[peer.ID]*ledger
Jeromy's avatar
Jeromy committed
137 138

	ticker *time.Ticker
139 140
}

141
// NewEngine creates a new block sending engine for the given block store
142
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger) *Engine {
143
	e := &Engine{
144 145 146 147 148 149
		ledgerMap:  make(map[peer.ID]*ledger),
		bs:         bs,
		peerTagger: peerTagger,
		outbox:     make(chan (<-chan *Envelope), outboxChanBuffer),
		workSignal: make(chan struct{}, 1),
		ticker:     time.NewTicker(time.Millisecond * 100),
150
	}
151 152
	e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
	e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
153
	e.peerRequestQueue = peertaskqueue.New(peertaskqueue.OnPeerAddedHook(e.onPeerAdded), peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved))
154
	go e.taskWorker(ctx)
155
	go e.scoreWorker(ctx)
156
	return e
Jeromy's avatar
Jeromy committed
157 158
}

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
174 175
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
func (e *Engine) scoreWorker(ctx context.Context) {
	ticker := time.NewTicker(shortTerm)
	defer ticker.Stop()

	type update struct {
		peer  peer.ID
		score int
	}
	var (
		lastShortUpdate, lastLongUpdate time.Time
		updates                         []update
	)

	for i := 0; ; i = (i + 1) % longTermRatio {
		var now time.Time
		select {
		case now = <-ticker.C:
		case <-ctx.Done():
			return
		}

		// The long term update ticks every `longTermRatio` short
		// intervals.
		updateLong := i == 0

		e.lock.Lock()
		for _, ledger := range e.ledgerMap {
			ledger.lk.Lock()

			// Update the short-term score.
			if ledger.lastExchange.After(lastShortUpdate) {
				ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha)
			} else {
				ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha)
			}

			// Update the long-term score.
			if updateLong {
				if ledger.lastExchange.After(lastLongUpdate) {
					ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha)
				} else {
					ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha)
				}
			}

			// Calculate the new score.
222 223 224 225
			//
			// The accounting score adjustment prefers peers _we_
			// need over peers that need us. This doesn't help with
			// leeching.
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
			score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75))

			// Avoid updating the connection manager unless there's a change. This can be expensive.
			if ledger.score != score {
				// put these in a list so we can perform the updates outside _global_ the lock.
				updates = append(updates, update{ledger.Partner, score})
				ledger.score = score
			}
			ledger.lk.Unlock()
		}
		e.lock.Unlock()

		// record the times.
		lastShortUpdate = now
		if updateLong {
			lastLongUpdate = now
		}

		// apply the updates
		for _, update := range updates {
			if update.score == 0 {
				e.peerTagger.UntagPeer(update.peer, e.tagUseful)
			} else {
				e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score)
			}
		}
		// Keep the memory. It's not much and it saves us from having to allocate.
		updates = updates[:0]
	}
}

257
func (e *Engine) onPeerAdded(p peer.ID) {
258
	e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
259 260 261
}

func (e *Engine) onPeerRemoved(p peer.ID) {
262
	e.peerTagger.UntagPeer(p, e.tagQueued)
263 264
}

265
// WantlistForPeer returns the currently understood want list for a given peer
266
func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) {
267 268 269 270
	partner := e.findOrCreate(p)
	partner.lk.Lock()
	defer partner.lk.Unlock()
	return partner.wantList.SortedEntries()
271 272
}

273 274
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
	ledger := e.findOrCreate(p)

	ledger.lk.Lock()
	defer ledger.lk.Unlock()

	return &Receipt{
		Peer:      ledger.Partner.String(),
		Value:     ledger.Accounting.Value(),
		Sent:      ledger.Accounting.BytesSent,
		Recv:      ledger.Accounting.BytesRecv,
		Exchanged: ledger.ExchangeCount(),
	}
}

290
func (e *Engine) taskWorker(ctx context.Context) {
291 292
	defer close(e.outbox) // because taskWorker uses the channel exclusively
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
293
		oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
294 295 296 297 298 299 300 301 302 303 304 305
		select {
		case <-ctx.Done():
			return
		case e.outbox <- oneTimeUse:
		}
		// receiver is ready for an outoing envelope. let's prepare one. first,
		// we must acquire a task from the PQ...
		envelope, err := e.nextEnvelope(ctx)
		if err != nil {
			close(oneTimeUse)
			return // ctx cancelled
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
306
		oneTimeUse <- envelope // buffered. won't block
307 308 309 310 311 312 313
		close(oneTimeUse)
	}
}

// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
Jeromy's avatar
Jeromy committed
314
	for {
315
		nextTask := e.peerRequestQueue.PopBlock()
316
		for nextTask == nil {
Jeromy's avatar
Jeromy committed
317
			select {
318
			case <-ctx.Done():
319
				return nil, ctx.Err()
320
			case <-e.workSignal:
321
				nextTask = e.peerRequestQueue.PopBlock()
Jeromy's avatar
Jeromy committed
322
			case <-e.ticker.C:
323 324
				e.peerRequestQueue.ThawRound()
				nextTask = e.peerRequestQueue.PopBlock()
Jeromy's avatar
Jeromy committed
325 326
			}
		}
327 328

		// with a task in hand, we're ready to prepare the envelope...
329
		msg := bsmsg.New(true)
330 331
		for _, entry := range nextTask.Tasks {
			block, err := e.bs.Get(entry.Identifier.(cid.Cid))
332 333 334 335 336 337
			if err != nil {
				log.Errorf("tried to execute a task and errored fetching block: %s", err)
				continue
			}
			msg.AddBlock(block)
		}
338

339
		if msg.Empty() {
Jeromy's avatar
Jeromy committed
340 341
			// If we don't have the block, don't hold that against the peer
			// make sure to update that the task has been 'completed'
342
			nextTask.Done(nextTask.Tasks)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
343
			continue
344
		}
345

346
		return &Envelope{
347 348
			Peer:    nextTask.Target,
			Message: msg,
349
			Sent: func() {
350
				nextTask.Done(nextTask.Tasks)
351 352 353 354 355 356 357
				select {
				case e.workSignal <- struct{}{}:
					// work completing may mean that our queue will provide new
					// work to be done.
				default:
				}
			},
358
		}, nil
Jeromy's avatar
Jeromy committed
359 360 361
	}
}

362
// Outbox returns a channel of one-time use Envelope channels.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
363
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
364
	return e.outbox
365 366
}

367
// Peers returns a slice of Peers with whom the local node has active sessions.
368
func (e *Engine) Peers() []peer.ID {
369 370
	e.lock.Lock()
	defer e.lock.Unlock()
371

372 373
	response := make([]peer.ID, 0, len(e.ledgerMap))

374
	for _, ledger := range e.ledgerMap {
375 376 377 378 379 380 381
		response = append(response, ledger.Partner)
	}
	return response
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
382
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
383
	if m.Empty() {
384
		log.Debugf("received empty message from %s", p)
385 386
	}

387 388 389
	newWorkExists := false
	defer func() {
		if newWorkExists {
390
			e.signalNewWork()
391 392
		}
	}()
393

394
	l := e.findOrCreate(p)
Jeromy's avatar
Jeromy committed
395 396
	l.lk.Lock()
	defer l.lk.Unlock()
397 398 399
	if m.Full() {
		l.wantList = wl.New()
	}
400

401
	var msgSize int
402
	var activeEntries []peertask.Task
403 404
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
405 406 407
			log.Debugf("%s cancel %s", p, entry.Cid)
			l.CancelWant(entry.Cid)
			e.peerRequestQueue.Remove(entry.Cid, p)
408
		} else {
409 410
			log.Debugf("wants %s - %d", entry.Cid, entry.Priority)
			l.Wants(entry.Cid, entry.Priority)
411 412 413 414 415 416 417
			blockSize, err := e.bs.GetSize(entry.Cid)
			if err != nil {
				if err == bstore.ErrNotFound {
					continue
				}
				log.Error(err)
			} else {
418
				// we have the block
Brian Tiger Chow's avatar
Brian Tiger Chow committed
419
				newWorkExists = true
420
				if msgSize+blockSize > maxMessageSize {
421 422
					e.peerRequestQueue.PushBlock(p, activeEntries...)
					activeEntries = []peertask.Task{}
423 424
					msgSize = 0
				}
425
				activeEntries = append(activeEntries, peertask.Task{Identifier: entry.Cid, Priority: entry.Priority})
426
				msgSize += blockSize
427
			}
428 429
		}
	}
430
	if len(activeEntries) > 0 {
431
		e.peerRequestQueue.PushBlock(p, activeEntries...)
432
	}
433
	for _, block := range m.Blocks() {
Jeromy's avatar
Jeromy committed
434 435
		log.Debugf("got block %s %d bytes", block, len(block.RawData()))
		l.ReceivedBytes(len(block.RawData()))
436 437 438
	}
}

439
func (e *Engine) addBlocks(ks []cid.Cid) {
440 441 442
	work := false

	for _, l := range e.ledgerMap {
Jeromy's avatar
Jeromy committed
443
		l.lk.Lock()
444 445
		for _, k := range ks {
			if entry, ok := l.WantListContains(k); ok {
446 447 448 449 450 451
				e.peerRequestQueue.PushBlock(l.Partner, peertask.Task{
					Identifier: entry.Cid,
					Priority:   entry.Priority,
				})
				work = true
			}
452
		}
Jeromy's avatar
Jeromy committed
453
		l.lk.Unlock()
454 455 456 457 458 459 460
	}

	if work {
		e.signalNewWork()
	}
}

461 462 463
// AddBlocks is called when new blocks are received and added to a block store,
// meaning there may be peers who want those blocks, so we should send the blocks
// to them.
464
func (e *Engine) AddBlocks(ks []cid.Cid) {
465 466 467
	e.lock.Lock()
	defer e.lock.Unlock()

468
	e.addBlocks(ks)
469 470
}

471 472 473 474 475 476
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

477 478
// MessageSent is called when a message has successfully been sent out, to record
// changes.
479
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
480
	l := e.findOrCreate(p)
481 482 483
	l.lk.Lock()
	defer l.lk.Unlock()

484
	for _, block := range m.Blocks() {
Jeromy's avatar
Jeromy committed
485
		l.SentBytes(len(block.RawData()))
486 487
		l.wantList.Remove(block.Cid())
		e.peerRequestQueue.Remove(block.Cid(), p)
488 489 490 491
	}

}

492 493
// PeerConnected is called when a new peer connects, meaning we should start
// sending blocks.
494 495
func (e *Engine) PeerConnected(p peer.ID) {
	e.lock.Lock()
496
	defer e.lock.Unlock()
497 498 499 500 501 502
	l, ok := e.ledgerMap[p]
	if !ok {
		l = newLedger(p)
		e.ledgerMap[p] = l
	}
	l.lk.Lock()
503
	defer l.lk.Unlock()
504 505 506
	l.ref++
}

507
// PeerDisconnected is called when a peer disconnects.
508
func (e *Engine) PeerDisconnected(p peer.ID) {
509 510 511 512 513 514 515
	e.lock.Lock()
	defer e.lock.Unlock()
	l, ok := e.ledgerMap[p]
	if !ok {
		return
	}
	l.lk.Lock()
516
	defer l.lk.Unlock()
517 518 519 520
	l.ref--
	if l.ref <= 0 {
		delete(e.ledgerMap, p)
	}
521 522
}

523
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
524
	// NB not threadsafe
525
	return e.findOrCreate(p).Accounting.BytesSent
526 527
}

528
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
529
	// NB not threadsafe
530
	return e.findOrCreate(p).Accounting.BytesRecv
531 532 533
}

// ledger lazily instantiates a ledger
534
func (e *Engine) findOrCreate(p peer.ID) *ledger {
Jeromy's avatar
Jeromy committed
535
	e.lock.Lock()
536
	defer e.lock.Unlock()
537
	l, ok := e.ledgerMap[p]
538 539
	if !ok {
		l = newLedger(p)
540
		e.ledgerMap[p] = l
541 542 543
	}
	return l
}
544 545 546 547 548 549 550 551

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}