session.go 14.3 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4 5 6

import (
	"context"
	"time"

7 8 9 10 11
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	bsgetter "github.com/ipfs/go-bitswap/internal/getter"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
Jeromy's avatar
Jeromy committed
12 13
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
14
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
15
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
16
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
17
	loggables "github.com/libp2p/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
18 19
)

dirkmc's avatar
dirkmc committed
20 21
var log = logging.Logger("bs:sess")

22
const (
dirkmc's avatar
dirkmc committed
23
	broadcastLiveWantsLimit = 64
24
)
Jeromy's avatar
Jeromy committed
25

26
// WantManager is an interface that can be used to request blocks
27
// from given peers.
28
type WantManager interface {
dirkmc's avatar
dirkmc committed
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
	// BroadcastWantHaves sends want-haves to all connected peers (used for
	// session discovery)
	BroadcastWantHaves(context.Context, uint64, []cid.Cid)
	// RemoveSession removes the session from the WantManager (when the
	// session shuts down)
	RemoveSession(context.Context, uint64)
}

// PeerManager keeps track of which sessions are interested in which peers
// and takes care of sending wants for the sessions
type PeerManager interface {
	// RegisterSession tells the PeerManager that the session is interested
	// in a peer's connection state
	RegisterSession(peer.ID, bspm.Session) bool
	// UnregisterSession tells the PeerManager that the session is no longer
	// interested in a peer's connection state
	UnregisterSession(uint64)
	// SendWants tells the PeerManager to send wants to the given peer
	SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
48 49
}

50
// SessionPeerManager keeps track of peers in the session
dirkmc's avatar
dirkmc committed
51
type SessionPeerManager interface {
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
	// PeersDiscovered indicates if any peers have been discovered yet
	PeersDiscovered() bool
	// Shutdown the SessionPeerManager
	Shutdown()
	// Adds a peer to the session, returning true if the peer is new
	AddPeer(peer.ID) bool
	// Removes a peer from the session, returning true if the peer existed
	RemovePeer(peer.ID) bool
	// All peers in the session
	Peers() []peer.ID
	// Whether there are any peers in the session
	HasPeers() bool
}

// ProviderFinder is used to find providers for a given key
type ProviderFinder interface {
	// FindProvidersAsync searches for peers that provide the given CID
	FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID
70 71
}

dirkmc's avatar
dirkmc committed
72
// opType is the kind of operation that is being processed by the event loop
73 74 75
type opType int

const (
dirkmc's avatar
dirkmc committed
76
	// Receive blocks
77
	opReceive opType = iota
dirkmc's avatar
dirkmc committed
78
	// Want blocks
79
	opWant
dirkmc's avatar
dirkmc committed
80
	// Cancel wants
81
	opCancel
dirkmc's avatar
dirkmc committed
82 83
	// Broadcast want-haves
	opBroadcast
84 85
	// Wants sent to peers
	opWantsSent
86 87 88 89 90
)

type op struct {
	op   opType
	keys []cid.Cid
91 92
}

Jeromy's avatar
Jeromy committed
93 94
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
95
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
96
type Session struct {
97
	// dependencies
98 99 100 101 102
	ctx            context.Context
	wm             WantManager
	sprm           SessionPeerManager
	providerFinder ProviderFinder
	sim            *bssim.SessionInterestManager
dirkmc's avatar
dirkmc committed
103 104 105

	sw  sessionWants
	sws sessionWantSender
106

dirkmc's avatar
dirkmc committed
107
	latencyTrkr latencyTracker
108

109
	// channels
110
	incoming      chan op
111 112 113
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
114 115 116 117 118 119
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
120
	// identifiers
Jeromy's avatar
Jeromy committed
121
	notif notifications.PubSub
122 123
	uuid  logging.Loggable
	id    uint64
dirkmc's avatar
dirkmc committed
124 125

	self peer.ID
Jeromy's avatar
Jeromy committed
126 127
}

128
// New creates a new bitswap session whose lifetime is bounded by the
129
// given context.
130 131 132
func New(ctx context.Context,
	id uint64,
	wm WantManager,
dirkmc's avatar
dirkmc committed
133
	sprm SessionPeerManager,
134
	providerFinder ProviderFinder,
dirkmc's avatar
dirkmc committed
135
	sim *bssim.SessionInterestManager,
136
	pm PeerManager,
dirkmc's avatar
dirkmc committed
137
	bpm *bsbpm.BlockPresenceManager,
138
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
139
	initialSearchDelay time.Duration,
dirkmc's avatar
dirkmc committed
140 141
	periodicSearchDelay delay.D,
	self peer.ID) *Session {
Jeromy's avatar
Jeromy committed
142
	s := &Session{
143
		sw:                  newSessionWants(broadcastLiveWantsLimit),
Steven Allen's avatar
Steven Allen committed
144 145 146
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
		wm:                  wm,
dirkmc's avatar
dirkmc committed
147
		sprm:                sprm,
148
		providerFinder:      providerFinder,
dirkmc's avatar
dirkmc committed
149 150 151
		sim:                 sim,
		incoming:            make(chan op, 128),
		latencyTrkr:         latencyTracker{},
152
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
153 154 155 156 157
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
dirkmc's avatar
dirkmc committed
158
		self:                self,
Jeromy's avatar
Jeromy committed
159
	}
160
	s.sws = newSessionWantSender(ctx, id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)
Jeromy's avatar
Jeromy committed
161 162 163 164 165 166

	go s.run(ctx)

	return s
}

dirkmc's avatar
dirkmc committed
167 168 169 170
func (s *Session) ID() uint64 {
	return s.id
}

171
// ReceiveFrom receives incoming blocks from the given peer.
dirkmc's avatar
dirkmc committed
172
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
173 174 175
	// The SessionManager tells each Session about all keys that it may be
	// interested in. Here the Session filters the keys to the ones that this
	// particular Session is interested in.
dirkmc's avatar
dirkmc committed
176 177 178 179
	interestedRes := s.sim.FilterSessionInterested(s.id, ks, haves, dontHaves)
	ks = interestedRes[0]
	haves = interestedRes[1]
	dontHaves = interestedRes[2]
Dirk McCormick's avatar
Dirk McCormick committed
180
	s.logReceiveFrom(from, ks, haves, dontHaves)
dirkmc's avatar
dirkmc committed
181

182 183
	// Inform the session want sender that a message has been received
	s.sws.Update(from, ks, haves, dontHaves)
dirkmc's avatar
dirkmc committed
184 185

	if len(ks) == 0 {
186 187 188
		return
	}

189
	// Inform the session that blocks have been received
190
	select {
191
	case s.incoming <- op{op: opReceive, keys: ks}:
192 193 194 195
	case <-s.ctx.Done():
	}
}

Dirk McCormick's avatar
Dirk McCormick committed
196 197 198 199 200 201 202 203 204 205 206 207 208
func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
	// log.Debugf("Ses%d<-%s: %d blocks, %d haves, %d dont haves\n",
	// 	s.id, from, len(interestedKs), len(wantedHaves), len(wantedDontHaves))
	for _, c := range interestedKs {
		log.Debugw("Bitswap <- block", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
	for _, c := range haves {
		log.Debugw("Bitswap <- HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
	for _, c := range dontHaves {
		log.Debugw("Bitswap <- DONT_HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
	}
}
dirkmc's avatar
dirkmc committed
209

210 211 212 213 214 215 216 217 218 219
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
220 221

	return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
222 223
		func(ctx context.Context, keys []cid.Cid) {
			select {
224
			case s.incoming <- op{op: opWant, keys: keys}:
225 226 227 228 229 230
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
231
			case s.incoming <- op{op: opCancel, keys: keys}:
232 233 234 235
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
236 237
}

238
// SetBaseTickDelay changes the rate at which ticks happen.
239 240 241 242 243
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
244 245
}

246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
// onWantsSent is called when wants are sent to a peer by the session wants sender
func (s *Session) onWantsSent(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	allBlks := append(wantBlocks[:len(wantBlocks):len(wantBlocks)], wantHaves...)
	s.nonBlockingEnqueue(op{op: opWantsSent, keys: allBlks})
}

// onPeersExhausted is called when all available peers have sent DONT_HAVE for
// a set of cids (or all peers become unavailable)
func (s *Session) onPeersExhausted(ks []cid.Cid) {
	s.nonBlockingEnqueue(op{op: opBroadcast, keys: ks})
}

// We don't want to block the sessionWantSender if the incoming channel
// is full. So if we can't immediately send on the incoming channel spin
// it off into a go-routine.
func (s *Session) nonBlockingEnqueue(o op) {
	select {
	case s.incoming <- o:
	default:
		go func() {
			select {
			case s.incoming <- o:
			case <-s.ctx.Done():
			}
		}()
	}
}

dirkmc's avatar
dirkmc committed
274 275
// Session run loop -- everything in this function should not be called
// outside of this loop
Jeromy's avatar
Jeromy committed
276
func (s *Session) run(ctx context.Context) {
dirkmc's avatar
dirkmc committed
277 278
	go s.sws.Run()

Steven Allen's avatar
Steven Allen committed
279 280
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
281 282
	for {
		select {
283 284 285
		case oper := <-s.incoming:
			switch oper.op {
			case opReceive:
286
				// Received blocks
dirkmc's avatar
dirkmc committed
287
				s.handleReceive(oper.keys)
288
			case opWant:
289
				// Client wants blocks
290 291
				s.wantBlocks(ctx, oper.keys)
			case opCancel:
292
				// Wants were cancelled
293
				s.sw.CancelPending(oper.keys)
294 295 296
			case opWantsSent:
				// Wants were sent to a peer
				s.sw.WantsSent(oper.keys)
dirkmc's avatar
dirkmc committed
297
			case opBroadcast:
298
				// Broadcast want-haves to all peers
299
				s.broadcastWantHaves(ctx, oper.keys)
300 301 302
			default:
				panic("unhandled operation")
			}
Steven Allen's avatar
Steven Allen committed
303
		case <-s.idleTick.C:
304
			// The session hasn't received blocks for a while, broadcast
305
			s.broadcastWantHaves(ctx, nil)
Steven Allen's avatar
Steven Allen committed
306
		case <-s.periodicSearchTimer.C:
307
			// Periodically search for a random live want
Steven Allen's avatar
Steven Allen committed
308
			s.handlePeriodicSearch(ctx)
309
		case baseTickDelay := <-s.tickDelayReqs:
310
			// Set the base tick delay
311
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
312
		case <-ctx.Done():
313
			// Shutdown
314
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
315 316 317 318 319
			return
		}
	}
}

320 321 322 323 324 325 326 327 328 329
// Called when the session hasn't received any blocks for some time, or when
// all peers in the session have sent DONT_HAVE for a particular set of CIDs.
// Send want-haves to all connected peers, and search for new peers with the CID.
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
	// If this broadcast is because of an idle timeout (we haven't received
	// any blocks for a while) then broadcast all pending wants
	if wants == nil {
		wants = s.sw.PrepareBroadcast()
	}

dirkmc's avatar
dirkmc committed
330
	// Broadcast a want-have for the live wants to everyone we're connected to
331
	s.wm.BroadcastWantHaves(ctx, s.id, wants)
332

dirkmc's avatar
dirkmc committed
333
	// do not find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
334
	// -- just rely on periodic search widening
335 336 337 338
	if len(wants) > 0 && (s.consecutiveTicks == 0) {
		// Search for providers who have the first want in the list.
		// Typically if the provider has the first block they will have
		// the rest of the blocks also.
Dirk McCormick's avatar
Dirk McCormick committed
339
		log.Infof("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, wants[0], len(wants))
340
		s.findMorePeers(ctx, wants[0])
341
	}
Steven Allen's avatar
Steven Allen committed
342
	s.resetIdleTick()
343

344
	// If we have live wants record a consecutive tick
345
	if s.sw.HasLiveWants() {
346 347
		s.consecutiveTicks++
	}
348 349
}

350 351
// handlePeriodicSearch is called periodically to search for providers of a
// randomly chosen CID in the sesssion.
Steven Allen's avatar
Steven Allen committed
352
func (s *Session) handlePeriodicSearch(ctx context.Context) {
353
	randomWant := s.sw.RandomLiveWant()
354
	if !randomWant.Defined() {
355 356 357 358 359
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
360
	s.findMorePeers(ctx, randomWant)
dirkmc's avatar
dirkmc committed
361 362

	s.wm.BroadcastWantHaves(ctx, s.id, []cid.Cid{randomWant})
363

Steven Allen's avatar
Steven Allen committed
364
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
365 366
}

367 368 369 370 371 372 373 374 375 376 377 378 379
// findMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
		for p := range s.providerFinder.FindProvidersAsync(ctx, k) {
			// When a provider indicates that it has a cid, it's equivalent to
			// the providing peer sending a HAVE
			s.sws.Update(p, nil, []cid.Cid{c}, nil)
		}
	}(c)
}

// handleShutdown is called when the session shuts down
380
func (s *Session) handleShutdown() {
381
	// Stop the idle timer
Steven Allen's avatar
Steven Allen committed
382
	s.idleTick.Stop()
383 384 385
	// Shut down the session peer manager
	s.sprm.Shutdown()
	// Remove the session from the want manager
dirkmc's avatar
dirkmc committed
386
	s.wm.RemoveSession(s.ctx, s.id)
387 388
}

389
// handleReceive is called when the session receives blocks from a peer
dirkmc's avatar
dirkmc committed
390
func (s *Session) handleReceive(ks []cid.Cid) {
391 392 393 394 395 396 397 398 399 400 401 402 403 404
	// Record which blocks have been received and figure out the total latency
	// for fetching the blocks
	wanted, totalLatency := s.sw.BlocksReceived(ks)
	if len(wanted) == 0 {
		return
	}

	// Record latency
	s.latencyTrkr.receiveUpdate(len(wanted), totalLatency)

	// Inform the SessionInterestManager that this session is no longer
	// expecting to receive the wanted keys
	s.sim.RemoveSessionWants(s.id, wanted)

405 406 407 408 409 410
	s.idleTick.Stop()

	// We've received new wanted blocks, so reset the number of ticks
	// that have occurred since the last new block
	s.consecutiveTicks = 0

dirkmc's avatar
dirkmc committed
411
	s.resetIdleTick()
412 413
}

414
// wantBlocks is called when blocks are requested by the client
415
func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
dirkmc's avatar
dirkmc committed
416
	if len(newks) > 0 {
417
		// Inform the SessionInterestManager that this session is interested in the keys
dirkmc's avatar
dirkmc committed
418
		s.sim.RecordSessionInterest(s.id, newks)
419
		// Tell the sessionWants tracker that that the wants have been requested
dirkmc's avatar
dirkmc committed
420
		s.sw.BlocksRequested(newks)
421
		// Tell the sessionWantSender that the blocks have been requested
dirkmc's avatar
dirkmc committed
422
		s.sws.Add(newks)
Jeromy's avatar
Jeromy committed
423
	}
424

425
	// If we have discovered peers already, the sessionWantSender will
dirkmc's avatar
dirkmc committed
426
	// send wants to them
427
	if s.sprm.PeersDiscovered() {
dirkmc's avatar
dirkmc committed
428
		return
429
	}
Jeromy's avatar
Jeromy committed
430

dirkmc's avatar
dirkmc committed
431
	// No peers discovered yet, broadcast some want-haves
432
	ks := s.sw.GetNextWants()
dirkmc's avatar
dirkmc committed
433 434 435 436
	if len(ks) > 0 {
		log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
		s.wm.BroadcastWantHaves(ctx, s.id, ks)
	}
Jeromy's avatar
Jeromy committed
437
}
438

439 440 441 442 443 444 445
// The session will broadcast if it has outstanding wants and doesn't receive
// any blocks for some time.
// The length of time is calculated
// - initially
//   as a fixed delay
// - once some blocks are received
//   from a base delay and average latency, with a backoff
Steven Allen's avatar
Steven Allen committed
446
func (s *Session) resetIdleTick() {
447
	var tickDelay time.Duration
dirkmc's avatar
dirkmc committed
448
	if !s.latencyTrkr.hasLatency() {
Steven Allen's avatar
Steven Allen committed
449
		tickDelay = s.initialSearchDelay
450
	} else {
dirkmc's avatar
dirkmc committed
451
		avLat := s.latencyTrkr.averageLatency()
452
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
453
	}
454
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
455
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
456
}
457

458 459
// latencyTracker keeps track of the average latency between sending a want
// and receiving the corresponding block
dirkmc's avatar
dirkmc committed
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
type latencyTracker struct {
	totalLatency time.Duration
	count        int
}

func (lt *latencyTracker) hasLatency() bool {
	return lt.totalLatency > 0 && lt.count > 0
}

func (lt *latencyTracker) averageLatency() time.Duration {
	return lt.totalLatency / time.Duration(lt.count)
}

func (lt *latencyTracker) receiveUpdate(count int, totalLatency time.Duration) {
	lt.totalLatency += totalLatency
	lt.count += count
476
}