session.go 14.4 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4 5 6

import (
	"context"
	"time"

7 8 9 10 11
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	bsgetter "github.com/ipfs/go-bitswap/internal/getter"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
Jeromy's avatar
Jeromy committed
12 13
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
14
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
15
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
16
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
17
	loggables "github.com/libp2p/go-libp2p-loggables"
Dirk McCormick's avatar
Dirk McCormick committed
18
	"go.uber.org/zap"
Jeromy's avatar
Jeromy committed
19 20
)

dirkmc's avatar
dirkmc committed
21
var log = logging.Logger("bs:sess")
Dirk McCormick's avatar
Dirk McCormick committed
22
var sflog = log.Desugar()
dirkmc's avatar
dirkmc committed
23

24
const (
dirkmc's avatar
dirkmc committed
25
	broadcastLiveWantsLimit = 64
26
)
Jeromy's avatar
Jeromy committed
27

28
// WantManager is an interface that can be used to request blocks
29
// from given peers.
30
type WantManager interface {
dirkmc's avatar
dirkmc committed
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
	// BroadcastWantHaves sends want-haves to all connected peers (used for
	// session discovery)
	BroadcastWantHaves(context.Context, uint64, []cid.Cid)
	// RemoveSession removes the session from the WantManager (when the
	// session shuts down)
	RemoveSession(context.Context, uint64)
}

// PeerManager keeps track of which sessions are interested in which peers
// and takes care of sending wants for the sessions
type PeerManager interface {
	// RegisterSession tells the PeerManager that the session is interested
	// in a peer's connection state
	RegisterSession(peer.ID, bspm.Session) bool
	// UnregisterSession tells the PeerManager that the session is no longer
	// interested in a peer's connection state
	UnregisterSession(uint64)
	// SendWants tells the PeerManager to send wants to the given peer
	SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
50 51
}

52
// SessionPeerManager keeps track of peers in the session
dirkmc's avatar
dirkmc committed
53
type SessionPeerManager interface {
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	// PeersDiscovered indicates if any peers have been discovered yet
	PeersDiscovered() bool
	// Shutdown the SessionPeerManager
	Shutdown()
	// Adds a peer to the session, returning true if the peer is new
	AddPeer(peer.ID) bool
	// Removes a peer from the session, returning true if the peer existed
	RemovePeer(peer.ID) bool
	// All peers in the session
	Peers() []peer.ID
	// Whether there are any peers in the session
	HasPeers() bool
}

// ProviderFinder is used to find providers for a given key
type ProviderFinder interface {
	// FindProvidersAsync searches for peers that provide the given CID
	FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID
72 73
}

dirkmc's avatar
dirkmc committed
74
// opType is the kind of operation that is being processed by the event loop
75 76 77
type opType int

const (
dirkmc's avatar
dirkmc committed
78
	// Receive blocks
79
	opReceive opType = iota
dirkmc's avatar
dirkmc committed
80
	// Want blocks
81
	opWant
dirkmc's avatar
dirkmc committed
82
	// Cancel wants
83
	opCancel
dirkmc's avatar
dirkmc committed
84 85
	// Broadcast want-haves
	opBroadcast
86 87
	// Wants sent to peers
	opWantsSent
88 89 90 91 92
)

type op struct {
	op   opType
	keys []cid.Cid
93 94
}

Jeromy's avatar
Jeromy committed
95 96
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
97
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
98
type Session struct {
99
	// dependencies
100 101 102 103 104
	ctx            context.Context
	wm             WantManager
	sprm           SessionPeerManager
	providerFinder ProviderFinder
	sim            *bssim.SessionInterestManager
dirkmc's avatar
dirkmc committed
105 106 107

	sw  sessionWants
	sws sessionWantSender
108

dirkmc's avatar
dirkmc committed
109
	latencyTrkr latencyTracker
110

111
	// channels
112
	incoming      chan op
113 114 115
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
116 117 118 119 120 121
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
122
	// identifiers
Jeromy's avatar
Jeromy committed
123
	notif notifications.PubSub
124 125
	uuid  logging.Loggable
	id    uint64
dirkmc's avatar
dirkmc committed
126 127

	self peer.ID
Jeromy's avatar
Jeromy committed
128 129
}

130
// New creates a new bitswap session whose lifetime is bounded by the
131
// given context.
132 133 134
func New(ctx context.Context,
	id uint64,
	wm WantManager,
dirkmc's avatar
dirkmc committed
135
	sprm SessionPeerManager,
136
	providerFinder ProviderFinder,
dirkmc's avatar
dirkmc committed
137
	sim *bssim.SessionInterestManager,
138
	pm PeerManager,
dirkmc's avatar
dirkmc committed
139
	bpm *bsbpm.BlockPresenceManager,
140
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
141
	initialSearchDelay time.Duration,
dirkmc's avatar
dirkmc committed
142 143
	periodicSearchDelay delay.D,
	self peer.ID) *Session {
Jeromy's avatar
Jeromy committed
144
	s := &Session{
145
		sw:                  newSessionWants(broadcastLiveWantsLimit),
Steven Allen's avatar
Steven Allen committed
146 147 148
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
		wm:                  wm,
dirkmc's avatar
dirkmc committed
149
		sprm:                sprm,
150
		providerFinder:      providerFinder,
dirkmc's avatar
dirkmc committed
151 152 153
		sim:                 sim,
		incoming:            make(chan op, 128),
		latencyTrkr:         latencyTracker{},
154
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
155 156 157 158 159
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
dirkmc's avatar
dirkmc committed
160
		self:                self,
Jeromy's avatar
Jeromy committed
161
	}
162
	s.sws = newSessionWantSender(ctx, id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)
Jeromy's avatar
Jeromy committed
163 164 165 166 167 168

	go s.run(ctx)

	return s
}

dirkmc's avatar
dirkmc committed
169 170 171 172
func (s *Session) ID() uint64 {
	return s.id
}

173
// ReceiveFrom receives incoming blocks from the given peer.
dirkmc's avatar
dirkmc committed
174
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
175 176 177
	// The SessionManager tells each Session about all keys that it may be
	// interested in. Here the Session filters the keys to the ones that this
	// particular Session is interested in.
dirkmc's avatar
dirkmc committed
178 179 180 181
	interestedRes := s.sim.FilterSessionInterested(s.id, ks, haves, dontHaves)
	ks = interestedRes[0]
	haves = interestedRes[1]
	dontHaves = interestedRes[2]
Dirk McCormick's avatar
Dirk McCormick committed
182
	s.logReceiveFrom(from, ks, haves, dontHaves)
dirkmc's avatar
dirkmc committed
183

184 185
	// Inform the session want sender that a message has been received
	s.sws.Update(from, ks, haves, dontHaves)
dirkmc's avatar
dirkmc committed
186 187

	if len(ks) == 0 {
188 189 190
		return
	}

191
	// Inform the session that blocks have been received
192
	select {
193
	case s.incoming <- op{op: opReceive, keys: ks}:
194 195 196 197
	case <-s.ctx.Done():
	}
}

Dirk McCormick's avatar
Dirk McCormick committed
198
func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
Dirk McCormick's avatar
Dirk McCormick committed
199 200 201 202 203
	// Save some CPU cycles if log level is higher than debug
	if ce := sflog.Check(zap.DebugLevel, "Bitswap <- rcv message"); ce == nil {
		return
	}

Dirk McCormick's avatar
Dirk McCormick committed
204 205 206 207 208 209 210 211 212 213
	for _, c := range interestedKs {
		log.Debugw("Bitswap <- block", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
	for _, c := range haves {
		log.Debugw("Bitswap <- HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
	for _, c := range dontHaves {
		log.Debugw("Bitswap <- DONT_HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
}
dirkmc's avatar
dirkmc committed
214

215 216 217 218 219 220 221 222 223 224
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
225 226

	return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
227 228
		func(ctx context.Context, keys []cid.Cid) {
			select {
229
			case s.incoming <- op{op: opWant, keys: keys}:
230 231 232 233 234 235
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
236
			case s.incoming <- op{op: opCancel, keys: keys}:
237 238 239 240
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
241 242
}

243
// SetBaseTickDelay changes the rate at which ticks happen.
244 245 246 247 248
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
249 250
}

251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
// onWantsSent is called when wants are sent to a peer by the session wants sender
func (s *Session) onWantsSent(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	allBlks := append(wantBlocks[:len(wantBlocks):len(wantBlocks)], wantHaves...)
	s.nonBlockingEnqueue(op{op: opWantsSent, keys: allBlks})
}

// onPeersExhausted is called when all available peers have sent DONT_HAVE for
// a set of cids (or all peers become unavailable)
func (s *Session) onPeersExhausted(ks []cid.Cid) {
	s.nonBlockingEnqueue(op{op: opBroadcast, keys: ks})
}

// We don't want to block the sessionWantSender if the incoming channel
// is full. So if we can't immediately send on the incoming channel spin
// it off into a go-routine.
func (s *Session) nonBlockingEnqueue(o op) {
	select {
	case s.incoming <- o:
	default:
		go func() {
			select {
			case s.incoming <- o:
			case <-s.ctx.Done():
			}
		}()
	}
}

dirkmc's avatar
dirkmc committed
279 280
// Session run loop -- everything in this function should not be called
// outside of this loop
Jeromy's avatar
Jeromy committed
281
func (s *Session) run(ctx context.Context) {
dirkmc's avatar
dirkmc committed
282 283
	go s.sws.Run()

Steven Allen's avatar
Steven Allen committed
284 285
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
286 287
	for {
		select {
288 289 290
		case oper := <-s.incoming:
			switch oper.op {
			case opReceive:
291
				// Received blocks
dirkmc's avatar
dirkmc committed
292
				s.handleReceive(oper.keys)
293
			case opWant:
294
				// Client wants blocks
295 296
				s.wantBlocks(ctx, oper.keys)
			case opCancel:
297
				// Wants were cancelled
298
				s.sw.CancelPending(oper.keys)
299 300 301
			case opWantsSent:
				// Wants were sent to a peer
				s.sw.WantsSent(oper.keys)
dirkmc's avatar
dirkmc committed
302
			case opBroadcast:
303
				// Broadcast want-haves to all peers
304
				s.broadcastWantHaves(ctx, oper.keys)
305 306 307
			default:
				panic("unhandled operation")
			}
Steven Allen's avatar
Steven Allen committed
308
		case <-s.idleTick.C:
309
			// The session hasn't received blocks for a while, broadcast
310
			s.broadcastWantHaves(ctx, nil)
Steven Allen's avatar
Steven Allen committed
311
		case <-s.periodicSearchTimer.C:
312
			// Periodically search for a random live want
Steven Allen's avatar
Steven Allen committed
313
			s.handlePeriodicSearch(ctx)
314
		case baseTickDelay := <-s.tickDelayReqs:
315
			// Set the base tick delay
316
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
317
		case <-ctx.Done():
318
			// Shutdown
319
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
320 321 322 323 324
			return
		}
	}
}

325 326 327 328 329 330 331 332 333 334
// Called when the session hasn't received any blocks for some time, or when
// all peers in the session have sent DONT_HAVE for a particular set of CIDs.
// Send want-haves to all connected peers, and search for new peers with the CID.
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
	// If this broadcast is because of an idle timeout (we haven't received
	// any blocks for a while) then broadcast all pending wants
	if wants == nil {
		wants = s.sw.PrepareBroadcast()
	}

dirkmc's avatar
dirkmc committed
335
	// Broadcast a want-have for the live wants to everyone we're connected to
336
	s.wm.BroadcastWantHaves(ctx, s.id, wants)
337

dirkmc's avatar
dirkmc committed
338
	// do not find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
339
	// -- just rely on periodic search widening
340 341 342 343
	if len(wants) > 0 && (s.consecutiveTicks == 0) {
		// Search for providers who have the first want in the list.
		// Typically if the provider has the first block they will have
		// the rest of the blocks also.
Dirk McCormick's avatar
Dirk McCormick committed
344
		log.Debugf("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, wants[0], len(wants))
345
		s.findMorePeers(ctx, wants[0])
346
	}
Steven Allen's avatar
Steven Allen committed
347
	s.resetIdleTick()
348

349
	// If we have live wants record a consecutive tick
350
	if s.sw.HasLiveWants() {
351 352
		s.consecutiveTicks++
	}
353 354
}

355 356
// handlePeriodicSearch is called periodically to search for providers of a
// randomly chosen CID in the sesssion.
Steven Allen's avatar
Steven Allen committed
357
func (s *Session) handlePeriodicSearch(ctx context.Context) {
358
	randomWant := s.sw.RandomLiveWant()
359
	if !randomWant.Defined() {
360 361 362 363 364
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
365
	s.findMorePeers(ctx, randomWant)
dirkmc's avatar
dirkmc committed
366 367

	s.wm.BroadcastWantHaves(ctx, s.id, []cid.Cid{randomWant})
368

Steven Allen's avatar
Steven Allen committed
369
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
370 371
}

372 373 374 375 376 377 378 379 380 381 382 383 384
// findMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
		for p := range s.providerFinder.FindProvidersAsync(ctx, k) {
			// When a provider indicates that it has a cid, it's equivalent to
			// the providing peer sending a HAVE
			s.sws.Update(p, nil, []cid.Cid{c}, nil)
		}
	}(c)
}

// handleShutdown is called when the session shuts down
385
func (s *Session) handleShutdown() {
386
	// Stop the idle timer
Steven Allen's avatar
Steven Allen committed
387
	s.idleTick.Stop()
388 389 390
	// Shut down the session peer manager
	s.sprm.Shutdown()
	// Remove the session from the want manager
dirkmc's avatar
dirkmc committed
391
	s.wm.RemoveSession(s.ctx, s.id)
392 393
}

394
// handleReceive is called when the session receives blocks from a peer
dirkmc's avatar
dirkmc committed
395
func (s *Session) handleReceive(ks []cid.Cid) {
396 397 398 399 400 401 402 403 404 405 406 407 408 409
	// Record which blocks have been received and figure out the total latency
	// for fetching the blocks
	wanted, totalLatency := s.sw.BlocksReceived(ks)
	if len(wanted) == 0 {
		return
	}

	// Record latency
	s.latencyTrkr.receiveUpdate(len(wanted), totalLatency)

	// Inform the SessionInterestManager that this session is no longer
	// expecting to receive the wanted keys
	s.sim.RemoveSessionWants(s.id, wanted)

410 411 412 413 414 415
	s.idleTick.Stop()

	// We've received new wanted blocks, so reset the number of ticks
	// that have occurred since the last new block
	s.consecutiveTicks = 0

dirkmc's avatar
dirkmc committed
416
	s.resetIdleTick()
417 418
}

419
// wantBlocks is called when blocks are requested by the client
420
func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
dirkmc's avatar
dirkmc committed
421
	if len(newks) > 0 {
422
		// Inform the SessionInterestManager that this session is interested in the keys
dirkmc's avatar
dirkmc committed
423
		s.sim.RecordSessionInterest(s.id, newks)
424
		// Tell the sessionWants tracker that that the wants have been requested
dirkmc's avatar
dirkmc committed
425
		s.sw.BlocksRequested(newks)
426
		// Tell the sessionWantSender that the blocks have been requested
dirkmc's avatar
dirkmc committed
427
		s.sws.Add(newks)
Jeromy's avatar
Jeromy committed
428
	}
429

430
	// If we have discovered peers already, the sessionWantSender will
dirkmc's avatar
dirkmc committed
431
	// send wants to them
432
	if s.sprm.PeersDiscovered() {
dirkmc's avatar
dirkmc committed
433
		return
434
	}
Jeromy's avatar
Jeromy committed
435

dirkmc's avatar
dirkmc committed
436
	// No peers discovered yet, broadcast some want-haves
437
	ks := s.sw.GetNextWants()
dirkmc's avatar
dirkmc committed
438 439 440 441
	if len(ks) > 0 {
		log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
		s.wm.BroadcastWantHaves(ctx, s.id, ks)
	}
Jeromy's avatar
Jeromy committed
442
}
443

444 445 446 447 448 449 450
// The session will broadcast if it has outstanding wants and doesn't receive
// any blocks for some time.
// The length of time is calculated
// - initially
//   as a fixed delay
// - once some blocks are received
//   from a base delay and average latency, with a backoff
Steven Allen's avatar
Steven Allen committed
451
func (s *Session) resetIdleTick() {
452
	var tickDelay time.Duration
dirkmc's avatar
dirkmc committed
453
	if !s.latencyTrkr.hasLatency() {
Steven Allen's avatar
Steven Allen committed
454
		tickDelay = s.initialSearchDelay
455
	} else {
dirkmc's avatar
dirkmc committed
456
		avLat := s.latencyTrkr.averageLatency()
457
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
458
	}
459
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
460
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
461
}
462

463 464
// latencyTracker keeps track of the average latency between sending a want
// and receiving the corresponding block
dirkmc's avatar
dirkmc committed
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
type latencyTracker struct {
	totalLatency time.Duration
	count        int
}

func (lt *latencyTracker) hasLatency() bool {
	return lt.totalLatency > 0 && lt.count > 0
}

func (lt *latencyTracker) averageLatency() time.Duration {
	return lt.totalLatency / time.Duration(lt.count)
}

func (lt *latencyTracker) receiveUpdate(count int, totalLatency time.Duration) {
	lt.totalLatency += totalLatency
	lt.count += count
481
}