session.go 15.1 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4 5 6

import (
	"context"
	"time"

7 8 9 10 11 12 13 14 15 16 17
	bsbpm "gitlab.dms3.io/dms3/go-bitswap/internal/blockpresencemanager"
	bsgetter "gitlab.dms3.io/dms3/go-bitswap/internal/getter"
	notifications "gitlab.dms3.io/dms3/go-bitswap/internal/notifications"
	bspm "gitlab.dms3.io/dms3/go-bitswap/internal/peermanager"
	bssim "gitlab.dms3.io/dms3/go-bitswap/internal/sessioninterestmanager"
	blocks "gitlab.dms3.io/dms3/go-block-format"
	cid "gitlab.dms3.io/dms3/go-cid"
	delay "gitlab.dms3.io/dms3/go-dms3-delay"
	logging "gitlab.dms3.io/dms3/go-log"
	peer "gitlab.dms3.io/p2p/go-p2p-core/peer"
	loggables "gitlab.dms3.io/p2p/go-p2p-loggables"
Dirk McCormick's avatar
Dirk McCormick committed
18
	"go.uber.org/zap"
Jeromy's avatar
Jeromy committed
19 20
)

dirkmc's avatar
dirkmc committed
21
var log = logging.Logger("bs:sess")
Dirk McCormick's avatar
Dirk McCormick committed
22
var sflog = log.Desugar()
dirkmc's avatar
dirkmc committed
23

24
const (
dirkmc's avatar
dirkmc committed
25
	broadcastLiveWantsLimit = 64
26
)
Jeromy's avatar
Jeromy committed
27

dirkmc's avatar
dirkmc committed
28 29 30 31 32
// PeerManager keeps track of which sessions are interested in which peers
// and takes care of sending wants for the sessions
type PeerManager interface {
	// RegisterSession tells the PeerManager that the session is interested
	// in a peer's connection state
33
	RegisterSession(peer.ID, bspm.Session)
dirkmc's avatar
dirkmc committed
34 35 36 37 38
	// UnregisterSession tells the PeerManager that the session is no longer
	// interested in a peer's connection state
	UnregisterSession(uint64)
	// SendWants tells the PeerManager to send wants to the given peer
	SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
Dirk McCormick's avatar
Dirk McCormick committed
39 40 41 42 43
	// BroadcastWantHaves sends want-haves to all connected peers (used for
	// session discovery)
	BroadcastWantHaves(context.Context, []cid.Cid)
	// SendCancels tells the PeerManager to send cancels to all peers
	SendCancels(context.Context, []cid.Cid)
44 45
}

46 47 48 49 50 51 52 53
// SessionManager manages all the sessions
type SessionManager interface {
	// Remove a session (called when the session shuts down)
	RemoveSession(sesid uint64)
	// Cancel wants (called when a call to GetBlocks() is cancelled)
	CancelSessionWants(sid uint64, wants []cid.Cid)
}

54
// SessionPeerManager keeps track of peers in the session
dirkmc's avatar
dirkmc committed
55
type SessionPeerManager interface {
56 57 58 59 60 61 62 63 64 65 66 67
	// PeersDiscovered indicates if any peers have been discovered yet
	PeersDiscovered() bool
	// Shutdown the SessionPeerManager
	Shutdown()
	// Adds a peer to the session, returning true if the peer is new
	AddPeer(peer.ID) bool
	// Removes a peer from the session, returning true if the peer existed
	RemovePeer(peer.ID) bool
	// All peers in the session
	Peers() []peer.ID
	// Whether there are any peers in the session
	HasPeers() bool
68 69
	// Protect connection from being pruned by the connection manager
	ProtectConnection(peer.ID)
70 71 72 73 74 75
}

// ProviderFinder is used to find providers for a given key
type ProviderFinder interface {
	// FindProvidersAsync searches for peers that provide the given CID
	FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID
76 77
}

dirkmc's avatar
dirkmc committed
78
// opType is the kind of operation that is being processed by the event loop
79 80 81
type opType int

const (
dirkmc's avatar
dirkmc committed
82
	// Receive blocks
83
	opReceive opType = iota
dirkmc's avatar
dirkmc committed
84
	// Want blocks
85
	opWant
dirkmc's avatar
dirkmc committed
86
	// Cancel wants
87
	opCancel
dirkmc's avatar
dirkmc committed
88 89
	// Broadcast want-haves
	opBroadcast
90 91
	// Wants sent to peers
	opWantsSent
92 93 94 95 96
)

type op struct {
	op   opType
	keys []cid.Cid
97 98
}

Jeromy's avatar
Jeromy committed
99 100
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
101
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
102
type Session struct {
103
	// dependencies
104 105 106
	ctx            context.Context
	shutdown       func()
	sm             SessionManager
Dirk McCormick's avatar
Dirk McCormick committed
107
	pm             PeerManager
108 109 110
	sprm           SessionPeerManager
	providerFinder ProviderFinder
	sim            *bssim.SessionInterestManager
dirkmc's avatar
dirkmc committed
111 112 113

	sw  sessionWants
	sws sessionWantSender
114

dirkmc's avatar
dirkmc committed
115
	latencyTrkr latencyTracker
116

117
	// channels
118
	incoming      chan op
119 120 121
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
122 123 124 125 126 127
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
128
	// identifiers
Jeromy's avatar
Jeromy committed
129
	notif notifications.PubSub
130
	uuid  loggables.DeferredMap
131
	id    uint64
dirkmc's avatar
dirkmc committed
132 133

	self peer.ID
Jeromy's avatar
Jeromy committed
134 135
}

136
// New creates a new bitswap session whose lifetime is bounded by the
137
// given context.
138
func New(
139 140
	ctx context.Context,
	sm SessionManager,
141
	id uint64,
dirkmc's avatar
dirkmc committed
142
	sprm SessionPeerManager,
143
	providerFinder ProviderFinder,
dirkmc's avatar
dirkmc committed
144
	sim *bssim.SessionInterestManager,
145
	pm PeerManager,
dirkmc's avatar
dirkmc committed
146
	bpm *bsbpm.BlockPresenceManager,
147
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
148
	initialSearchDelay time.Duration,
dirkmc's avatar
dirkmc committed
149 150
	periodicSearchDelay delay.D,
	self peer.ID) *Session {
151 152

	ctx, cancel := context.WithCancel(ctx)
Jeromy's avatar
Jeromy committed
153
	s := &Session{
154
		sw:                  newSessionWants(broadcastLiveWantsLimit),
Steven Allen's avatar
Steven Allen committed
155 156
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
157 158
		shutdown:            cancel,
		sm:                  sm,
Dirk McCormick's avatar
Dirk McCormick committed
159
		pm:                  pm,
dirkmc's avatar
dirkmc committed
160
		sprm:                sprm,
161
		providerFinder:      providerFinder,
dirkmc's avatar
dirkmc committed
162 163 164
		sim:                 sim,
		incoming:            make(chan op, 128),
		latencyTrkr:         latencyTracker{},
165
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
166 167 168 169 170
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
dirkmc's avatar
dirkmc committed
171
		self:                self,
Jeromy's avatar
Jeromy committed
172
	}
173
	s.sws = newSessionWantSender(id, pm, sprm, sm, bpm, s.onWantsSent, s.onPeersExhausted)
Jeromy's avatar
Jeromy committed
174 175 176 177 178 179

	go s.run(ctx)

	return s
}

dirkmc's avatar
dirkmc committed
180 181 182 183
func (s *Session) ID() uint64 {
	return s.id
}

184 185 186 187
func (s *Session) Shutdown() {
	s.shutdown()
}

188
// ReceiveFrom receives incoming blocks from the given peer.
dirkmc's avatar
dirkmc committed
189
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
190 191 192
	// The SessionManager tells each Session about all keys that it may be
	// interested in. Here the Session filters the keys to the ones that this
	// particular Session is interested in.
dirkmc's avatar
dirkmc committed
193 194 195 196
	interestedRes := s.sim.FilterSessionInterested(s.id, ks, haves, dontHaves)
	ks = interestedRes[0]
	haves = interestedRes[1]
	dontHaves = interestedRes[2]
Dirk McCormick's avatar
Dirk McCormick committed
197
	s.logReceiveFrom(from, ks, haves, dontHaves)
dirkmc's avatar
dirkmc committed
198

199 200
	// Inform the session want sender that a message has been received
	s.sws.Update(from, ks, haves, dontHaves)
dirkmc's avatar
dirkmc committed
201 202

	if len(ks) == 0 {
203 204 205
		return
	}

206
	// Inform the session that blocks have been received
207
	select {
208
	case s.incoming <- op{op: opReceive, keys: ks}:
209 210 211 212
	case <-s.ctx.Done():
	}
}

Dirk McCormick's avatar
Dirk McCormick committed
213
func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
Dirk McCormick's avatar
Dirk McCormick committed
214 215 216 217 218
	// Save some CPU cycles if log level is higher than debug
	if ce := sflog.Check(zap.DebugLevel, "Bitswap <- rcv message"); ce == nil {
		return
	}

Dirk McCormick's avatar
Dirk McCormick committed
219 220 221 222 223 224 225 226 227 228
	for _, c := range interestedKs {
		log.Debugw("Bitswap <- block", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
	for _, c := range haves {
		log.Debugw("Bitswap <- HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
	for _, c := range dontHaves {
		log.Debugw("Bitswap <- DONT_HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
}
dirkmc's avatar
dirkmc committed
229

230 231 232 233 234 235 236 237 238
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
239
	// ctx = logging.ContextWithLoggable(ctx, s.uuid)
240 241

	return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
242 243
		func(ctx context.Context, keys []cid.Cid) {
			select {
244
			case s.incoming <- op{op: opWant, keys: keys}:
245 246 247 248 249 250
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
251
			case s.incoming <- op{op: opCancel, keys: keys}:
252 253 254 255
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
256 257
}

258
// SetBaseTickDelay changes the rate at which ticks happen.
259 260 261 262 263
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
264 265
}

266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
// onWantsSent is called when wants are sent to a peer by the session wants sender
func (s *Session) onWantsSent(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	allBlks := append(wantBlocks[:len(wantBlocks):len(wantBlocks)], wantHaves...)
	s.nonBlockingEnqueue(op{op: opWantsSent, keys: allBlks})
}

// onPeersExhausted is called when all available peers have sent DONT_HAVE for
// a set of cids (or all peers become unavailable)
func (s *Session) onPeersExhausted(ks []cid.Cid) {
	s.nonBlockingEnqueue(op{op: opBroadcast, keys: ks})
}

// We don't want to block the sessionWantSender if the incoming channel
// is full. So if we can't immediately send on the incoming channel spin
// it off into a go-routine.
func (s *Session) nonBlockingEnqueue(o op) {
	select {
	case s.incoming <- o:
	default:
		go func() {
			select {
			case s.incoming <- o:
			case <-s.ctx.Done():
			}
		}()
	}
}

dirkmc's avatar
dirkmc committed
294 295
// Session run loop -- everything in this function should not be called
// outside of this loop
Jeromy's avatar
Jeromy committed
296
func (s *Session) run(ctx context.Context) {
dirkmc's avatar
dirkmc committed
297 298
	go s.sws.Run()

Steven Allen's avatar
Steven Allen committed
299 300
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
301 302
	for {
		select {
303 304 305
		case oper := <-s.incoming:
			switch oper.op {
			case opReceive:
306
				// Received blocks
dirkmc's avatar
dirkmc committed
307
				s.handleReceive(oper.keys)
308
			case opWant:
309
				// Client wants blocks
310 311
				s.wantBlocks(ctx, oper.keys)
			case opCancel:
312
				// Wants were cancelled
313
				s.sw.CancelPending(oper.keys)
314
				s.sws.Cancel(oper.keys)
315 316 317
			case opWantsSent:
				// Wants were sent to a peer
				s.sw.WantsSent(oper.keys)
dirkmc's avatar
dirkmc committed
318
			case opBroadcast:
319
				// Broadcast want-haves to all peers
Dirk McCormick's avatar
Dirk McCormick committed
320
				s.broadcast(ctx, oper.keys)
321 322 323
			default:
				panic("unhandled operation")
			}
Steven Allen's avatar
Steven Allen committed
324
		case <-s.idleTick.C:
325
			// The session hasn't received blocks for a while, broadcast
Dirk McCormick's avatar
Dirk McCormick committed
326
			s.broadcast(ctx, nil)
Steven Allen's avatar
Steven Allen committed
327
		case <-s.periodicSearchTimer.C:
328
			// Periodically search for a random live want
Steven Allen's avatar
Steven Allen committed
329
			s.handlePeriodicSearch(ctx)
330
		case baseTickDelay := <-s.tickDelayReqs:
331
			// Set the base tick delay
332
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
333
		case <-ctx.Done():
334
			// Shutdown
335
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
336 337 338 339 340
			return
		}
	}
}

341 342 343
// Called when the session hasn't received any blocks for some time, or when
// all peers in the session have sent DONT_HAVE for a particular set of CIDs.
// Send want-haves to all connected peers, and search for new peers with the CID.
Dirk McCormick's avatar
Dirk McCormick committed
344
func (s *Session) broadcast(ctx context.Context, wants []cid.Cid) {
345 346 347 348 349 350
	// If this broadcast is because of an idle timeout (we haven't received
	// any blocks for a while) then broadcast all pending wants
	if wants == nil {
		wants = s.sw.PrepareBroadcast()
	}

dirkmc's avatar
dirkmc committed
351
	// Broadcast a want-have for the live wants to everyone we're connected to
Dirk McCormick's avatar
Dirk McCormick committed
352
	s.broadcastWantHaves(ctx, wants)
353

dirkmc's avatar
dirkmc committed
354
	// do not find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
355
	// -- just rely on periodic search widening
356 357 358 359
	if len(wants) > 0 && (s.consecutiveTicks == 0) {
		// Search for providers who have the first want in the list.
		// Typically if the provider has the first block they will have
		// the rest of the blocks also.
Dirk McCormick's avatar
Dirk McCormick committed
360
		log.Debugw("FindMorePeers", "session", s.id, "cid", wants[0], "pending", len(wants))
361
		s.findMorePeers(ctx, wants[0])
362
	}
Steven Allen's avatar
Steven Allen committed
363
	s.resetIdleTick()
364

365
	// If we have live wants record a consecutive tick
366
	if s.sw.HasLiveWants() {
367 368
		s.consecutiveTicks++
	}
369 370
}

371 372
// handlePeriodicSearch is called periodically to search for providers of a
// randomly chosen CID in the sesssion.
Steven Allen's avatar
Steven Allen committed
373
func (s *Session) handlePeriodicSearch(ctx context.Context) {
374
	randomWant := s.sw.RandomLiveWant()
375
	if !randomWant.Defined() {
376 377 378 379 380
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
381
	s.findMorePeers(ctx, randomWant)
dirkmc's avatar
dirkmc committed
382

Dirk McCormick's avatar
Dirk McCormick committed
383
	s.broadcastWantHaves(ctx, []cid.Cid{randomWant})
384

Steven Allen's avatar
Steven Allen committed
385
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
386 387
}

388 389 390 391 392 393 394 395 396 397 398 399 400
// findMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
		for p := range s.providerFinder.FindProvidersAsync(ctx, k) {
			// When a provider indicates that it has a cid, it's equivalent to
			// the providing peer sending a HAVE
			s.sws.Update(p, nil, []cid.Cid{c}, nil)
		}
	}(c)
}

// handleShutdown is called when the session shuts down
401
func (s *Session) handleShutdown() {
402
	// Stop the idle timer
Steven Allen's avatar
Steven Allen committed
403
	s.idleTick.Stop()
404 405
	// Shut down the session peer manager
	s.sprm.Shutdown()
406 407 408
	// Shut down the sessionWantSender (blocks until sessionWantSender stops
	// sending)
	s.sws.Shutdown()
409 410 411
	// Signal to the SessionManager that the session has been shutdown
	// and can be cleaned up
	s.sm.RemoveSession(s.id)
412 413
}

414
// handleReceive is called when the session receives blocks from a peer
dirkmc's avatar
dirkmc committed
415
func (s *Session) handleReceive(ks []cid.Cid) {
416 417 418 419 420 421 422 423 424 425 426 427 428 429
	// Record which blocks have been received and figure out the total latency
	// for fetching the blocks
	wanted, totalLatency := s.sw.BlocksReceived(ks)
	if len(wanted) == 0 {
		return
	}

	// Record latency
	s.latencyTrkr.receiveUpdate(len(wanted), totalLatency)

	// Inform the SessionInterestManager that this session is no longer
	// expecting to receive the wanted keys
	s.sim.RemoveSessionWants(s.id, wanted)

430 431 432 433 434 435
	s.idleTick.Stop()

	// We've received new wanted blocks, so reset the number of ticks
	// that have occurred since the last new block
	s.consecutiveTicks = 0

dirkmc's avatar
dirkmc committed
436
	s.resetIdleTick()
437 438
}

439
// wantBlocks is called when blocks are requested by the client
440
func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
dirkmc's avatar
dirkmc committed
441
	if len(newks) > 0 {
442
		// Inform the SessionInterestManager that this session is interested in the keys
dirkmc's avatar
dirkmc committed
443
		s.sim.RecordSessionInterest(s.id, newks)
444
		// Tell the sessionWants tracker that that the wants have been requested
dirkmc's avatar
dirkmc committed
445
		s.sw.BlocksRequested(newks)
446
		// Tell the sessionWantSender that the blocks have been requested
dirkmc's avatar
dirkmc committed
447
		s.sws.Add(newks)
Jeromy's avatar
Jeromy committed
448
	}
449

450
	// If we have discovered peers already, the sessionWantSender will
dirkmc's avatar
dirkmc committed
451
	// send wants to them
452
	if s.sprm.PeersDiscovered() {
dirkmc's avatar
dirkmc committed
453
		return
454
	}
Jeromy's avatar
Jeromy committed
455

dirkmc's avatar
dirkmc committed
456
	// No peers discovered yet, broadcast some want-haves
457
	ks := s.sw.GetNextWants()
dirkmc's avatar
dirkmc committed
458
	if len(ks) > 0 {
Dirk McCormick's avatar
Dirk McCormick committed
459 460
		log.Infow("No peers - broadcasting", "session", s.id, "want-count", len(ks))
		s.broadcastWantHaves(ctx, ks)
dirkmc's avatar
dirkmc committed
461
	}
Jeromy's avatar
Jeromy committed
462
}
463

Dirk McCormick's avatar
Dirk McCormick committed
464 465 466 467 468 469
// Send want-haves to all connected peers
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
	log.Debugw("broadcastWantHaves", "session", s.id, "cids", wants)
	s.pm.BroadcastWantHaves(ctx, wants)
}

470 471 472 473 474 475 476
// The session will broadcast if it has outstanding wants and doesn't receive
// any blocks for some time.
// The length of time is calculated
// - initially
//   as a fixed delay
// - once some blocks are received
//   from a base delay and average latency, with a backoff
Steven Allen's avatar
Steven Allen committed
477
func (s *Session) resetIdleTick() {
478
	var tickDelay time.Duration
dirkmc's avatar
dirkmc committed
479
	if !s.latencyTrkr.hasLatency() {
Steven Allen's avatar
Steven Allen committed
480
		tickDelay = s.initialSearchDelay
481
	} else {
dirkmc's avatar
dirkmc committed
482
		avLat := s.latencyTrkr.averageLatency()
483
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
484
	}
485
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
486
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
487
}
488

489 490
// latencyTracker keeps track of the average latency between sending a want
// and receiving the corresponding block
dirkmc's avatar
dirkmc committed
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
type latencyTracker struct {
	totalLatency time.Duration
	count        int
}

func (lt *latencyTracker) hasLatency() bool {
	return lt.totalLatency > 0 && lt.count > 0
}

func (lt *latencyTracker) averageLatency() time.Duration {
	return lt.totalLatency / time.Duration(lt.count)
}

func (lt *latencyTracker) receiveUpdate(count int, totalLatency time.Duration) {
	lt.totalLatency += totalLatency
	lt.count += count
507
}