session.go 14.6 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4 5 6

import (
	"context"
	"time"

7 8
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	bsgetter "github.com/ipfs/go-bitswap/internal/getter"
9
	lu "github.com/ipfs/go-bitswap/internal/logutil"
10 11 12
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
Jeromy's avatar
Jeromy committed
13 14
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
15
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
16
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
17
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
18
	loggables "github.com/libp2p/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
19 20
)

dirkmc's avatar
dirkmc committed
21 22
var log = logging.Logger("bs:sess")

23
const (
dirkmc's avatar
dirkmc committed
24
	broadcastLiveWantsLimit = 64
25
)
Jeromy's avatar
Jeromy committed
26

27
// WantManager is an interface that can be used to request blocks
28
// from given peers.
29
type WantManager interface {
dirkmc's avatar
dirkmc committed
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
	// BroadcastWantHaves sends want-haves to all connected peers (used for
	// session discovery)
	BroadcastWantHaves(context.Context, uint64, []cid.Cid)
	// RemoveSession removes the session from the WantManager (when the
	// session shuts down)
	RemoveSession(context.Context, uint64)
}

// PeerManager keeps track of which sessions are interested in which peers
// and takes care of sending wants for the sessions
type PeerManager interface {
	// RegisterSession tells the PeerManager that the session is interested
	// in a peer's connection state
	RegisterSession(peer.ID, bspm.Session) bool
	// UnregisterSession tells the PeerManager that the session is no longer
	// interested in a peer's connection state
	UnregisterSession(uint64)
	// SendWants tells the PeerManager to send wants to the given peer
	SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
49 50
}

51
// SessionPeerManager keeps track of peers in the session
dirkmc's avatar
dirkmc committed
52
type SessionPeerManager interface {
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
	// PeersDiscovered indicates if any peers have been discovered yet
	PeersDiscovered() bool
	// Shutdown the SessionPeerManager
	Shutdown()
	// Adds a peer to the session, returning true if the peer is new
	AddPeer(peer.ID) bool
	// Removes a peer from the session, returning true if the peer existed
	RemovePeer(peer.ID) bool
	// All peers in the session
	Peers() []peer.ID
	// Whether there are any peers in the session
	HasPeers() bool
}

// ProviderFinder is used to find providers for a given key
type ProviderFinder interface {
	// FindProvidersAsync searches for peers that provide the given CID
	FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID
71 72
}

dirkmc's avatar
dirkmc committed
73
// opType is the kind of operation that is being processed by the event loop
74 75 76
type opType int

const (
dirkmc's avatar
dirkmc committed
77
	// Receive blocks
78
	opReceive opType = iota
dirkmc's avatar
dirkmc committed
79
	// Want blocks
80
	opWant
dirkmc's avatar
dirkmc committed
81
	// Cancel wants
82
	opCancel
dirkmc's avatar
dirkmc committed
83 84
	// Broadcast want-haves
	opBroadcast
85 86
	// Wants sent to peers
	opWantsSent
87 88 89 90 91
)

type op struct {
	op   opType
	keys []cid.Cid
92 93
}

Jeromy's avatar
Jeromy committed
94 95
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
96
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
97
type Session struct {
98
	// dependencies
99 100 101 102 103
	ctx            context.Context
	wm             WantManager
	sprm           SessionPeerManager
	providerFinder ProviderFinder
	sim            *bssim.SessionInterestManager
dirkmc's avatar
dirkmc committed
104 105 106

	sw  sessionWants
	sws sessionWantSender
107

dirkmc's avatar
dirkmc committed
108
	latencyTrkr latencyTracker
109

110
	// channels
111
	incoming      chan op
112 113 114
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
115 116 117 118 119 120
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
121
	// identifiers
Jeromy's avatar
Jeromy committed
122
	notif notifications.PubSub
123 124
	uuid  logging.Loggable
	id    uint64
dirkmc's avatar
dirkmc committed
125 126

	self peer.ID
Jeromy's avatar
Jeromy committed
127 128
}

129
// New creates a new bitswap session whose lifetime is bounded by the
130
// given context.
131 132 133
func New(ctx context.Context,
	id uint64,
	wm WantManager,
dirkmc's avatar
dirkmc committed
134
	sprm SessionPeerManager,
135
	providerFinder ProviderFinder,
dirkmc's avatar
dirkmc committed
136
	sim *bssim.SessionInterestManager,
137
	pm PeerManager,
dirkmc's avatar
dirkmc committed
138
	bpm *bsbpm.BlockPresenceManager,
139
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
140
	initialSearchDelay time.Duration,
dirkmc's avatar
dirkmc committed
141 142
	periodicSearchDelay delay.D,
	self peer.ID) *Session {
Jeromy's avatar
Jeromy committed
143
	s := &Session{
dirkmc's avatar
dirkmc committed
144
		sw:                  newSessionWants(),
Steven Allen's avatar
Steven Allen committed
145 146 147
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
		wm:                  wm,
dirkmc's avatar
dirkmc committed
148
		sprm:                sprm,
149
		providerFinder:      providerFinder,
dirkmc's avatar
dirkmc committed
150 151 152
		sim:                 sim,
		incoming:            make(chan op, 128),
		latencyTrkr:         latencyTracker{},
153
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
154 155 156 157 158
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
dirkmc's avatar
dirkmc committed
159
		self:                self,
Jeromy's avatar
Jeromy committed
160
	}
161
	s.sws = newSessionWantSender(ctx, id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)
Jeromy's avatar
Jeromy committed
162 163 164 165 166 167

	go s.run(ctx)

	return s
}

dirkmc's avatar
dirkmc committed
168 169 170 171
func (s *Session) ID() uint64 {
	return s.id
}

172
// ReceiveFrom receives incoming blocks from the given peer.
dirkmc's avatar
dirkmc committed
173
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
174 175 176
	// The SessionManager tells each Session about all keys that it may be
	// interested in. Here the Session filters the keys to the ones that this
	// particular Session is interested in.
dirkmc's avatar
dirkmc committed
177 178 179 180 181 182
	interestedRes := s.sim.FilterSessionInterested(s.id, ks, haves, dontHaves)
	ks = interestedRes[0]
	haves = interestedRes[1]
	dontHaves = interestedRes[2]
	// s.logReceiveFrom(from, ks, haves, dontHaves)

183 184
	// Inform the session want sender that a message has been received
	s.sws.Update(from, ks, haves, dontHaves)
dirkmc's avatar
dirkmc committed
185 186

	if len(ks) == 0 {
187 188 189
		return
	}

190
	// Inform the session that blocks have been received
191
	select {
192
	case s.incoming <- op{op: opReceive, keys: ks}:
193 194 195 196
	case <-s.ctx.Done():
	}
}

dirkmc's avatar
dirkmc committed
197 198 199 200
// func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// 	// log.Infof("Ses%d<-%s: %d blocks, %d haves, %d dont haves\n",
// 	// 	s.id, from, len(interestedKs), len(wantedHaves), len(wantedDontHaves))
// 	for _, c := range interestedKs {
201
// 		log.Warnf("Ses%d %s<-%s: block %s\n", s.id, lu.P(s.self), lu.P(from), lu.C(c))
dirkmc's avatar
dirkmc committed
202 203
// 	}
// 	for _, c := range haves {
204
// 		log.Warnf("Ses%d %s<-%s: HAVE %s\n", s.id, lu.P(s.self), lu.P(from), lu.C(c))
dirkmc's avatar
dirkmc committed
205 206
// 	}
// 	for _, c := range dontHaves {
207
// 		log.Warnf("Ses%d %s<-%s: DONT_HAVE %s\n", s.id, lu.P(s.self), lu.P(from), lu.C(c))
dirkmc's avatar
dirkmc committed
208 209 210
// 	}
// }

211 212 213 214 215 216 217 218 219 220
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
221 222

	return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
223 224
		func(ctx context.Context, keys []cid.Cid) {
			select {
225
			case s.incoming <- op{op: opWant, keys: keys}:
226 227 228 229 230 231
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
232
			case s.incoming <- op{op: opCancel, keys: keys}:
233 234 235 236
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
237 238
}

239
// SetBaseTickDelay changes the rate at which ticks happen.
240 241 242 243 244
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
245 246
}

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
// onWantsSent is called when wants are sent to a peer by the session wants sender
func (s *Session) onWantsSent(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	allBlks := append(wantBlocks[:len(wantBlocks):len(wantBlocks)], wantHaves...)
	s.nonBlockingEnqueue(op{op: opWantsSent, keys: allBlks})
}

// onPeersExhausted is called when all available peers have sent DONT_HAVE for
// a set of cids (or all peers become unavailable)
func (s *Session) onPeersExhausted(ks []cid.Cid) {
	s.nonBlockingEnqueue(op{op: opBroadcast, keys: ks})
}

// We don't want to block the sessionWantSender if the incoming channel
// is full. So if we can't immediately send on the incoming channel spin
// it off into a go-routine.
func (s *Session) nonBlockingEnqueue(o op) {
	select {
	case s.incoming <- o:
	default:
		go func() {
			select {
			case s.incoming <- o:
			case <-s.ctx.Done():
			}
		}()
	}
}

dirkmc's avatar
dirkmc committed
275 276
// Session run loop -- everything in this function should not be called
// outside of this loop
Jeromy's avatar
Jeromy committed
277
func (s *Session) run(ctx context.Context) {
dirkmc's avatar
dirkmc committed
278 279
	go s.sws.Run()

Steven Allen's avatar
Steven Allen committed
280 281
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
282 283
	for {
		select {
284 285 286
		case oper := <-s.incoming:
			switch oper.op {
			case opReceive:
287
				// Received blocks
dirkmc's avatar
dirkmc committed
288
				s.handleReceive(oper.keys)
289
			case opWant:
290
				// Client wants blocks
291 292
				s.wantBlocks(ctx, oper.keys)
			case opCancel:
293
				// Wants were cancelled
294
				s.sw.CancelPending(oper.keys)
295 296 297
			case opWantsSent:
				// Wants were sent to a peer
				s.sw.WantsSent(oper.keys)
dirkmc's avatar
dirkmc committed
298
			case opBroadcast:
299
				// Broadcast want-haves to all peers
300
				s.broadcastWantHaves(ctx, oper.keys)
301 302 303
			default:
				panic("unhandled operation")
			}
Steven Allen's avatar
Steven Allen committed
304
		case <-s.idleTick.C:
305
			// The session hasn't received blocks for a while, broadcast
306
			s.broadcastWantHaves(ctx, nil)
Steven Allen's avatar
Steven Allen committed
307
		case <-s.periodicSearchTimer.C:
308
			// Periodically search for a random live want
Steven Allen's avatar
Steven Allen committed
309
			s.handlePeriodicSearch(ctx)
310
		case baseTickDelay := <-s.tickDelayReqs:
311
			// Set the base tick delay
312
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
313
		case <-ctx.Done():
314
			// Shutdown
315
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
316 317 318 319 320
			return
		}
	}
}

321 322 323 324 325 326 327 328 329 330
// Called when the session hasn't received any blocks for some time, or when
// all peers in the session have sent DONT_HAVE for a particular set of CIDs.
// Send want-haves to all connected peers, and search for new peers with the CID.
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
	// If this broadcast is because of an idle timeout (we haven't received
	// any blocks for a while) then broadcast all pending wants
	if wants == nil {
		wants = s.sw.PrepareBroadcast()
	}

331
	// log.Warnf("\n\n\n\n\nSes%d: broadcast %d keys\n\n\n\n\n", s.id, len(live))
dirkmc's avatar
dirkmc committed
332
	// log.Infof("Ses%d: broadcast %d keys\n", s.id, len(live))
333

dirkmc's avatar
dirkmc committed
334
	// Broadcast a want-have for the live wants to everyone we're connected to
335
	s.wm.BroadcastWantHaves(ctx, s.id, wants)
336

dirkmc's avatar
dirkmc committed
337
	// do not find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
338
	// -- just rely on periodic search widening
339 340 341 342
	if len(wants) > 0 && (s.consecutiveTicks == 0) {
		// Search for providers who have the first want in the list.
		// Typically if the provider has the first block they will have
		// the rest of the blocks also.
343
		log.Warnf("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, lu.C(wants[0]), len(wants))
344
		s.findMorePeers(ctx, wants[0])
345
	}
Steven Allen's avatar
Steven Allen committed
346
	s.resetIdleTick()
347

348
	// If we have live wants record a consecutive tick
349
	if s.sw.HasLiveWants() {
350 351
		s.consecutiveTicks++
	}
352 353
}

354 355
// handlePeriodicSearch is called periodically to search for providers of a
// randomly chosen CID in the sesssion.
Steven Allen's avatar
Steven Allen committed
356
func (s *Session) handlePeriodicSearch(ctx context.Context) {
357
	randomWant := s.sw.RandomLiveWant()
358
	if !randomWant.Defined() {
359 360 361 362 363
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
364
	s.findMorePeers(ctx, randomWant)
dirkmc's avatar
dirkmc committed
365 366

	s.wm.BroadcastWantHaves(ctx, s.id, []cid.Cid{randomWant})
367

Steven Allen's avatar
Steven Allen committed
368
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
369 370
}

371 372 373 374 375 376 377 378 379 380 381 382 383
// findMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
		for p := range s.providerFinder.FindProvidersAsync(ctx, k) {
			// When a provider indicates that it has a cid, it's equivalent to
			// the providing peer sending a HAVE
			s.sws.Update(p, nil, []cid.Cid{c}, nil)
		}
	}(c)
}

// handleShutdown is called when the session shuts down
384
func (s *Session) handleShutdown() {
385
	// Stop the idle timer
Steven Allen's avatar
Steven Allen committed
386
	s.idleTick.Stop()
387 388 389
	// Shut down the session peer manager
	s.sprm.Shutdown()
	// Remove the session from the want manager
dirkmc's avatar
dirkmc committed
390
	s.wm.RemoveSession(s.ctx, s.id)
391 392
}

393
// handleReceive is called when the session receives blocks from a peer
dirkmc's avatar
dirkmc committed
394
func (s *Session) handleReceive(ks []cid.Cid) {
395 396 397 398 399 400 401 402 403 404 405 406 407 408
	// Record which blocks have been received and figure out the total latency
	// for fetching the blocks
	wanted, totalLatency := s.sw.BlocksReceived(ks)
	if len(wanted) == 0 {
		return
	}

	// Record latency
	s.latencyTrkr.receiveUpdate(len(wanted), totalLatency)

	// Inform the SessionInterestManager that this session is no longer
	// expecting to receive the wanted keys
	s.sim.RemoveSessionWants(s.id, wanted)

409 410 411 412 413 414
	s.idleTick.Stop()

	// We've received new wanted blocks, so reset the number of ticks
	// that have occurred since the last new block
	s.consecutiveTicks = 0

dirkmc's avatar
dirkmc committed
415
	s.resetIdleTick()
416 417
}

418
// wantBlocks is called when blocks are requested by the client
419
func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
dirkmc's avatar
dirkmc committed
420
	if len(newks) > 0 {
421
		// Inform the SessionInterestManager that this session is interested in the keys
dirkmc's avatar
dirkmc committed
422
		s.sim.RecordSessionInterest(s.id, newks)
423
		// Tell the sessionWants tracker that that the wants have been requested
dirkmc's avatar
dirkmc committed
424
		s.sw.BlocksRequested(newks)
425
		// Tell the sessionWantSender that the blocks have been requested
dirkmc's avatar
dirkmc committed
426
		s.sws.Add(newks)
Jeromy's avatar
Jeromy committed
427
	}
428

429
	// If we have discovered peers already, the sessionWantSender will
dirkmc's avatar
dirkmc committed
430
	// send wants to them
431
	if s.sprm.PeersDiscovered() {
dirkmc's avatar
dirkmc committed
432
		return
433
	}
Jeromy's avatar
Jeromy committed
434

dirkmc's avatar
dirkmc committed
435 436 437 438 439 440
	// No peers discovered yet, broadcast some want-haves
	ks := s.sw.GetNextWants(broadcastLiveWantsLimit)
	if len(ks) > 0 {
		log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
		s.wm.BroadcastWantHaves(ctx, s.id, ks)
	}
Jeromy's avatar
Jeromy committed
441
}
442

443 444 445 446 447 448 449
// The session will broadcast if it has outstanding wants and doesn't receive
// any blocks for some time.
// The length of time is calculated
// - initially
//   as a fixed delay
// - once some blocks are received
//   from a base delay and average latency, with a backoff
Steven Allen's avatar
Steven Allen committed
450
func (s *Session) resetIdleTick() {
451
	var tickDelay time.Duration
dirkmc's avatar
dirkmc committed
452
	if !s.latencyTrkr.hasLatency() {
Steven Allen's avatar
Steven Allen committed
453
		tickDelay = s.initialSearchDelay
454
	} else {
dirkmc's avatar
dirkmc committed
455
		avLat := s.latencyTrkr.averageLatency()
456
		// log.Warnf("averageLatency %s", avLat)
457
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
458
	}
459
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
460
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
461
}
462

463 464
// latencyTracker keeps track of the average latency between sending a want
// and receiving the corresponding block
dirkmc's avatar
dirkmc committed
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
type latencyTracker struct {
	totalLatency time.Duration
	count        int
}

func (lt *latencyTracker) hasLatency() bool {
	return lt.totalLatency > 0 && lt.count > 0
}

func (lt *latencyTracker) averageLatency() time.Duration {
	return lt.totalLatency / time.Duration(lt.count)
}

func (lt *latencyTracker) receiveUpdate(count int, totalLatency time.Duration) {
	lt.totalLatency += totalLatency
	lt.count += count
481
}