session.go 15 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4 5 6

import (
	"context"
	"time"

7 8 9 10 11
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	bsgetter "github.com/ipfs/go-bitswap/internal/getter"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
Jeromy's avatar
Jeromy committed
12 13
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
14
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
15
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
16
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
17
	loggables "github.com/libp2p/go-libp2p-loggables"
Dirk McCormick's avatar
Dirk McCormick committed
18
	"go.uber.org/zap"
Jeromy's avatar
Jeromy committed
19 20
)

dirkmc's avatar
dirkmc committed
21
var log = logging.Logger("bs:sess")
Dirk McCormick's avatar
Dirk McCormick committed
22
var sflog = log.Desugar()
dirkmc's avatar
dirkmc committed
23

24
const (
dirkmc's avatar
dirkmc committed
25
	broadcastLiveWantsLimit = 64
26
)
Jeromy's avatar
Jeromy committed
27

dirkmc's avatar
dirkmc committed
28 29 30 31 32 33 34 35 36 37 38
// PeerManager keeps track of which sessions are interested in which peers
// and takes care of sending wants for the sessions
type PeerManager interface {
	// RegisterSession tells the PeerManager that the session is interested
	// in a peer's connection state
	RegisterSession(peer.ID, bspm.Session) bool
	// UnregisterSession tells the PeerManager that the session is no longer
	// interested in a peer's connection state
	UnregisterSession(uint64)
	// SendWants tells the PeerManager to send wants to the given peer
	SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
Dirk McCormick's avatar
Dirk McCormick committed
39 40 41 42 43
	// BroadcastWantHaves sends want-haves to all connected peers (used for
	// session discovery)
	BroadcastWantHaves(context.Context, []cid.Cid)
	// SendCancels tells the PeerManager to send cancels to all peers
	SendCancels(context.Context, []cid.Cid)
44 45
}

46
// SessionPeerManager keeps track of peers in the session
dirkmc's avatar
dirkmc committed
47
type SessionPeerManager interface {
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
	// PeersDiscovered indicates if any peers have been discovered yet
	PeersDiscovered() bool
	// Shutdown the SessionPeerManager
	Shutdown()
	// Adds a peer to the session, returning true if the peer is new
	AddPeer(peer.ID) bool
	// Removes a peer from the session, returning true if the peer existed
	RemovePeer(peer.ID) bool
	// All peers in the session
	Peers() []peer.ID
	// Whether there are any peers in the session
	HasPeers() bool
}

// ProviderFinder is used to find providers for a given key
type ProviderFinder interface {
	// FindProvidersAsync searches for peers that provide the given CID
	FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID
66 67
}

dirkmc's avatar
dirkmc committed
68
// opType is the kind of operation that is being processed by the event loop
69 70 71
type opType int

const (
dirkmc's avatar
dirkmc committed
72
	// Receive blocks
73
	opReceive opType = iota
dirkmc's avatar
dirkmc committed
74
	// Want blocks
75
	opWant
dirkmc's avatar
dirkmc committed
76
	// Cancel wants
77
	opCancel
dirkmc's avatar
dirkmc committed
78 79
	// Broadcast want-haves
	opBroadcast
80 81
	// Wants sent to peers
	opWantsSent
82 83 84 85 86
)

type op struct {
	op   opType
	keys []cid.Cid
87 88
}

Jeromy's avatar
Jeromy committed
89 90
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
91
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
92
type Session struct {
93
	// dependencies
94 95
	bsctx          context.Context // context for bitswap
	ctx            context.Context // context for session
Dirk McCormick's avatar
Dirk McCormick committed
96 97
	pm             PeerManager
	bpm            *bsbpm.BlockPresenceManager
98 99 100
	sprm           SessionPeerManager
	providerFinder ProviderFinder
	sim            *bssim.SessionInterestManager
dirkmc's avatar
dirkmc committed
101 102 103

	sw  sessionWants
	sws sessionWantSender
104

dirkmc's avatar
dirkmc committed
105
	latencyTrkr latencyTracker
106

107
	// channels
108
	incoming      chan op
109 110 111
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
112 113 114 115 116 117
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
118
	// identifiers
Jeromy's avatar
Jeromy committed
119
	notif notifications.PubSub
120 121
	uuid  logging.Loggable
	id    uint64
dirkmc's avatar
dirkmc committed
122 123

	self peer.ID
Jeromy's avatar
Jeromy committed
124 125
}

126
// New creates a new bitswap session whose lifetime is bounded by the
127
// given context.
128 129 130
func New(
	bsctx context.Context, // context for bitswap
	ctx context.Context, // context for this session
131
	id uint64,
dirkmc's avatar
dirkmc committed
132
	sprm SessionPeerManager,
133
	providerFinder ProviderFinder,
dirkmc's avatar
dirkmc committed
134
	sim *bssim.SessionInterestManager,
135
	pm PeerManager,
dirkmc's avatar
dirkmc committed
136
	bpm *bsbpm.BlockPresenceManager,
137
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
138
	initialSearchDelay time.Duration,
dirkmc's avatar
dirkmc committed
139 140
	periodicSearchDelay delay.D,
	self peer.ID) *Session {
Jeromy's avatar
Jeromy committed
141
	s := &Session{
142
		sw:                  newSessionWants(broadcastLiveWantsLimit),
Steven Allen's avatar
Steven Allen committed
143
		tickDelayReqs:       make(chan time.Duration),
144
		bsctx:               bsctx,
Steven Allen's avatar
Steven Allen committed
145
		ctx:                 ctx,
Dirk McCormick's avatar
Dirk McCormick committed
146 147
		pm:                  pm,
		bpm:                 bpm,
dirkmc's avatar
dirkmc committed
148
		sprm:                sprm,
149
		providerFinder:      providerFinder,
dirkmc's avatar
dirkmc committed
150 151 152
		sim:                 sim,
		incoming:            make(chan op, 128),
		latencyTrkr:         latencyTracker{},
153
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
154 155 156 157 158
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
dirkmc's avatar
dirkmc committed
159
		self:                self,
Jeromy's avatar
Jeromy committed
160
	}
161
	s.sws = newSessionWantSender(id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)
Jeromy's avatar
Jeromy committed
162 163 164 165 166 167

	go s.run(ctx)

	return s
}

dirkmc's avatar
dirkmc committed
168 169 170 171
func (s *Session) ID() uint64 {
	return s.id
}

172
// ReceiveFrom receives incoming blocks from the given peer.
dirkmc's avatar
dirkmc committed
173
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
174 175 176
	// The SessionManager tells each Session about all keys that it may be
	// interested in. Here the Session filters the keys to the ones that this
	// particular Session is interested in.
dirkmc's avatar
dirkmc committed
177 178 179 180
	interestedRes := s.sim.FilterSessionInterested(s.id, ks, haves, dontHaves)
	ks = interestedRes[0]
	haves = interestedRes[1]
	dontHaves = interestedRes[2]
Dirk McCormick's avatar
Dirk McCormick committed
181
	s.logReceiveFrom(from, ks, haves, dontHaves)
dirkmc's avatar
dirkmc committed
182

183 184
	// Inform the session want sender that a message has been received
	s.sws.Update(from, ks, haves, dontHaves)
dirkmc's avatar
dirkmc committed
185 186

	if len(ks) == 0 {
187 188 189
		return
	}

190
	// Inform the session that blocks have been received
191
	select {
192
	case s.incoming <- op{op: opReceive, keys: ks}:
193 194 195 196
	case <-s.ctx.Done():
	}
}

Dirk McCormick's avatar
Dirk McCormick committed
197
func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
Dirk McCormick's avatar
Dirk McCormick committed
198 199 200 201 202
	// Save some CPU cycles if log level is higher than debug
	if ce := sflog.Check(zap.DebugLevel, "Bitswap <- rcv message"); ce == nil {
		return
	}

Dirk McCormick's avatar
Dirk McCormick committed
203 204 205 206 207 208 209 210 211 212
	for _, c := range interestedKs {
		log.Debugw("Bitswap <- block", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
	for _, c := range haves {
		log.Debugw("Bitswap <- HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
	for _, c := range dontHaves {
		log.Debugw("Bitswap <- DONT_HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
}
dirkmc's avatar
dirkmc committed
213

214 215 216 217 218 219 220 221 222 223
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
224 225

	return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
226 227
		func(ctx context.Context, keys []cid.Cid) {
			select {
228
			case s.incoming <- op{op: opWant, keys: keys}:
229 230 231 232 233 234
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
235
			case s.incoming <- op{op: opCancel, keys: keys}:
236 237 238 239
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
240 241
}

242
// SetBaseTickDelay changes the rate at which ticks happen.
243 244 245 246 247
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
248 249
}

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
// onWantsSent is called when wants are sent to a peer by the session wants sender
func (s *Session) onWantsSent(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	allBlks := append(wantBlocks[:len(wantBlocks):len(wantBlocks)], wantHaves...)
	s.nonBlockingEnqueue(op{op: opWantsSent, keys: allBlks})
}

// onPeersExhausted is called when all available peers have sent DONT_HAVE for
// a set of cids (or all peers become unavailable)
func (s *Session) onPeersExhausted(ks []cid.Cid) {
	s.nonBlockingEnqueue(op{op: opBroadcast, keys: ks})
}

// We don't want to block the sessionWantSender if the incoming channel
// is full. So if we can't immediately send on the incoming channel spin
// it off into a go-routine.
func (s *Session) nonBlockingEnqueue(o op) {
	select {
	case s.incoming <- o:
	default:
		go func() {
			select {
			case s.incoming <- o:
			case <-s.ctx.Done():
			}
		}()
	}
}

dirkmc's avatar
dirkmc committed
278 279
// Session run loop -- everything in this function should not be called
// outside of this loop
Jeromy's avatar
Jeromy committed
280
func (s *Session) run(ctx context.Context) {
dirkmc's avatar
dirkmc committed
281 282
	go s.sws.Run()

Steven Allen's avatar
Steven Allen committed
283 284
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
285 286
	for {
		select {
287 288 289
		case oper := <-s.incoming:
			switch oper.op {
			case opReceive:
290
				// Received blocks
dirkmc's avatar
dirkmc committed
291
				s.handleReceive(oper.keys)
292
			case opWant:
293
				// Client wants blocks
294 295
				s.wantBlocks(ctx, oper.keys)
			case opCancel:
296
				// Wants were cancelled
297
				s.sw.CancelPending(oper.keys)
298 299 300
			case opWantsSent:
				// Wants were sent to a peer
				s.sw.WantsSent(oper.keys)
dirkmc's avatar
dirkmc committed
301
			case opBroadcast:
302
				// Broadcast want-haves to all peers
Dirk McCormick's avatar
Dirk McCormick committed
303
				s.broadcast(ctx, oper.keys)
304 305 306
			default:
				panic("unhandled operation")
			}
Steven Allen's avatar
Steven Allen committed
307
		case <-s.idleTick.C:
308
			// The session hasn't received blocks for a while, broadcast
Dirk McCormick's avatar
Dirk McCormick committed
309
			s.broadcast(ctx, nil)
Steven Allen's avatar
Steven Allen committed
310
		case <-s.periodicSearchTimer.C:
311
			// Periodically search for a random live want
Steven Allen's avatar
Steven Allen committed
312
			s.handlePeriodicSearch(ctx)
313
		case baseTickDelay := <-s.tickDelayReqs:
314
			// Set the base tick delay
315
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
316
		case <-ctx.Done():
317
			// Shutdown
318
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
319 320 321 322 323
			return
		}
	}
}

324 325 326
// Called when the session hasn't received any blocks for some time, or when
// all peers in the session have sent DONT_HAVE for a particular set of CIDs.
// Send want-haves to all connected peers, and search for new peers with the CID.
Dirk McCormick's avatar
Dirk McCormick committed
327
func (s *Session) broadcast(ctx context.Context, wants []cid.Cid) {
328 329 330 331 332 333
	// If this broadcast is because of an idle timeout (we haven't received
	// any blocks for a while) then broadcast all pending wants
	if wants == nil {
		wants = s.sw.PrepareBroadcast()
	}

dirkmc's avatar
dirkmc committed
334
	// Broadcast a want-have for the live wants to everyone we're connected to
Dirk McCormick's avatar
Dirk McCormick committed
335
	s.broadcastWantHaves(ctx, wants)
336

dirkmc's avatar
dirkmc committed
337
	// do not find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
338
	// -- just rely on periodic search widening
339 340 341 342
	if len(wants) > 0 && (s.consecutiveTicks == 0) {
		// Search for providers who have the first want in the list.
		// Typically if the provider has the first block they will have
		// the rest of the blocks also.
Dirk McCormick's avatar
Dirk McCormick committed
343
		log.Debugw("FindMorePeers", "session", s.id, "cid", wants[0], "pending", len(wants))
344
		s.findMorePeers(ctx, wants[0])
345
	}
Steven Allen's avatar
Steven Allen committed
346
	s.resetIdleTick()
347

348
	// If we have live wants record a consecutive tick
349
	if s.sw.HasLiveWants() {
350 351
		s.consecutiveTicks++
	}
352 353
}

354 355
// handlePeriodicSearch is called periodically to search for providers of a
// randomly chosen CID in the sesssion.
Steven Allen's avatar
Steven Allen committed
356
func (s *Session) handlePeriodicSearch(ctx context.Context) {
357
	randomWant := s.sw.RandomLiveWant()
358
	if !randomWant.Defined() {
359 360 361 362 363
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
364
	s.findMorePeers(ctx, randomWant)
dirkmc's avatar
dirkmc committed
365

Dirk McCormick's avatar
Dirk McCormick committed
366
	s.broadcastWantHaves(ctx, []cid.Cid{randomWant})
367

Steven Allen's avatar
Steven Allen committed
368
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
369 370
}

371 372 373 374 375 376 377 378 379 380 381 382 383
// findMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
		for p := range s.providerFinder.FindProvidersAsync(ctx, k) {
			// When a provider indicates that it has a cid, it's equivalent to
			// the providing peer sending a HAVE
			s.sws.Update(p, nil, []cid.Cid{c}, nil)
		}
	}(c)
}

// handleShutdown is called when the session shuts down
384
func (s *Session) handleShutdown() {
385
	// Stop the idle timer
Steven Allen's avatar
Steven Allen committed
386
	s.idleTick.Stop()
387 388
	// Shut down the session peer manager
	s.sprm.Shutdown()
389 390 391
	// Shut down the sessionWantSender (blocks until sessionWantSender stops
	// sending)
	s.sws.Shutdown()
Dirk McCormick's avatar
Dirk McCormick committed
392 393 394 395 396 397 398 399

	// Remove session's interest in the given blocks.
	cancelKs := s.sim.RemoveSessionInterest(s.id)

	// Free up block presence tracking for keys that no session is interested
	// in anymore
	s.bpm.RemoveKeys(cancelKs)

400 401 402 403 404
	// Send CANCEL to all peers for blocks that no session is interested in
	// anymore.
	// Note: use bitswap context because session context has already been
	// cancelled.
	s.pm.SendCancels(s.bsctx, cancelKs)
405 406
}

407
// handleReceive is called when the session receives blocks from a peer
dirkmc's avatar
dirkmc committed
408
func (s *Session) handleReceive(ks []cid.Cid) {
409 410 411 412 413 414 415 416 417 418 419 420 421 422
	// Record which blocks have been received and figure out the total latency
	// for fetching the blocks
	wanted, totalLatency := s.sw.BlocksReceived(ks)
	if len(wanted) == 0 {
		return
	}

	// Record latency
	s.latencyTrkr.receiveUpdate(len(wanted), totalLatency)

	// Inform the SessionInterestManager that this session is no longer
	// expecting to receive the wanted keys
	s.sim.RemoveSessionWants(s.id, wanted)

423 424 425 426 427 428
	s.idleTick.Stop()

	// We've received new wanted blocks, so reset the number of ticks
	// that have occurred since the last new block
	s.consecutiveTicks = 0

dirkmc's avatar
dirkmc committed
429
	s.resetIdleTick()
430 431
}

432
// wantBlocks is called when blocks are requested by the client
433
func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
dirkmc's avatar
dirkmc committed
434
	if len(newks) > 0 {
435
		// Inform the SessionInterestManager that this session is interested in the keys
dirkmc's avatar
dirkmc committed
436
		s.sim.RecordSessionInterest(s.id, newks)
437
		// Tell the sessionWants tracker that that the wants have been requested
dirkmc's avatar
dirkmc committed
438
		s.sw.BlocksRequested(newks)
439
		// Tell the sessionWantSender that the blocks have been requested
dirkmc's avatar
dirkmc committed
440
		s.sws.Add(newks)
Jeromy's avatar
Jeromy committed
441
	}
442

443
	// If we have discovered peers already, the sessionWantSender will
dirkmc's avatar
dirkmc committed
444
	// send wants to them
445
	if s.sprm.PeersDiscovered() {
dirkmc's avatar
dirkmc committed
446
		return
447
	}
Jeromy's avatar
Jeromy committed
448

dirkmc's avatar
dirkmc committed
449
	// No peers discovered yet, broadcast some want-haves
450
	ks := s.sw.GetNextWants()
dirkmc's avatar
dirkmc committed
451
	if len(ks) > 0 {
Dirk McCormick's avatar
Dirk McCormick committed
452 453
		log.Infow("No peers - broadcasting", "session", s.id, "want-count", len(ks))
		s.broadcastWantHaves(ctx, ks)
dirkmc's avatar
dirkmc committed
454
	}
Jeromy's avatar
Jeromy committed
455
}
456

Dirk McCormick's avatar
Dirk McCormick committed
457 458 459 460 461 462
// Send want-haves to all connected peers
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
	log.Debugw("broadcastWantHaves", "session", s.id, "cids", wants)
	s.pm.BroadcastWantHaves(ctx, wants)
}

463 464 465 466 467 468 469
// The session will broadcast if it has outstanding wants and doesn't receive
// any blocks for some time.
// The length of time is calculated
// - initially
//   as a fixed delay
// - once some blocks are received
//   from a base delay and average latency, with a backoff
Steven Allen's avatar
Steven Allen committed
470
func (s *Session) resetIdleTick() {
471
	var tickDelay time.Duration
dirkmc's avatar
dirkmc committed
472
	if !s.latencyTrkr.hasLatency() {
Steven Allen's avatar
Steven Allen committed
473
		tickDelay = s.initialSearchDelay
474
	} else {
dirkmc's avatar
dirkmc committed
475
		avLat := s.latencyTrkr.averageLatency()
476
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
477
	}
478
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
479
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
480
}
481

482 483
// latencyTracker keeps track of the average latency between sending a want
// and receiving the corresponding block
dirkmc's avatar
dirkmc committed
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
type latencyTracker struct {
	totalLatency time.Duration
	count        int
}

func (lt *latencyTracker) hasLatency() bool {
	return lt.totalLatency > 0 && lt.count > 0
}

func (lt *latencyTracker) averageLatency() time.Duration {
	return lt.totalLatency / time.Duration(lt.count)
}

func (lt *latencyTracker) receiveUpdate(count int, totalLatency time.Duration) {
	lt.totalLatency += totalLatency
	lt.count += count
500
}