session.go 11.9 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4

import (
	"context"
dirkmc's avatar
dirkmc committed
5
	"sync"
Jeromy's avatar
Jeromy committed
6 7
	"time"

8 9 10 11 12 13
	// lu "github.com/ipfs/go-bitswap/internal/logutil"
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	bsgetter "github.com/ipfs/go-bitswap/internal/getter"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
Jeromy's avatar
Jeromy committed
14 15
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
16
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
17
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
18
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
19
	loggables "github.com/libp2p/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
20 21
)

dirkmc's avatar
dirkmc committed
22 23
var log = logging.Logger("bs:sess")

24
const (
dirkmc's avatar
dirkmc committed
25
	broadcastLiveWantsLimit = 64
26
)
Jeromy's avatar
Jeromy committed
27

28
// WantManager is an interface that can be used to request blocks
29
// from given peers.
30
type WantManager interface {
dirkmc's avatar
dirkmc committed
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
	// BroadcastWantHaves sends want-haves to all connected peers (used for
	// session discovery)
	BroadcastWantHaves(context.Context, uint64, []cid.Cid)
	// RemoveSession removes the session from the WantManager (when the
	// session shuts down)
	RemoveSession(context.Context, uint64)
}

// PeerManager keeps track of which sessions are interested in which peers
// and takes care of sending wants for the sessions
type PeerManager interface {
	// RegisterSession tells the PeerManager that the session is interested
	// in a peer's connection state
	RegisterSession(peer.ID, bspm.Session) bool
	// UnregisterSession tells the PeerManager that the session is no longer
	// interested in a peer's connection state
	UnregisterSession(uint64)
	// SendWants tells the PeerManager to send wants to the given peer
	SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
50 51
}

52 53
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
dirkmc's avatar
dirkmc committed
54 55 56 57 58 59 60
type SessionPeerManager interface {
	// ReceiveFrom is called when blocks and HAVEs are received from a peer.
	// It returns a boolean indicating if the peer is new to the session.
	ReceiveFrom(peerId peer.ID, blks []cid.Cid, haves []cid.Cid) bool
	// Peers returns the set of peers in the session.
	Peers() *peer.Set
	// FindMorePeers queries Content Routing to discover providers of the given cid
61
	FindMorePeers(context.Context, cid.Cid)
dirkmc's avatar
dirkmc committed
62
	// RecordPeerRequests records the time that a cid was requested from a peer
63
	RecordPeerRequests([]peer.ID, []cid.Cid)
dirkmc's avatar
dirkmc committed
64 65
	// RecordPeerResponse records the time that a response for a cid arrived
	// from a peer
66
	RecordPeerResponse(peer.ID, []cid.Cid)
dirkmc's avatar
dirkmc committed
67
	// RecordCancels records that cancels were sent for the given cids
68
	RecordCancels([]cid.Cid)
69 70
}

dirkmc's avatar
dirkmc committed
71
// opType is the kind of operation that is being processed by the event loop
72 73 74
type opType int

const (
dirkmc's avatar
dirkmc committed
75
	// Receive blocks
76
	opReceive opType = iota
dirkmc's avatar
dirkmc committed
77
	// Want blocks
78
	opWant
dirkmc's avatar
dirkmc committed
79
	// Cancel wants
80
	opCancel
dirkmc's avatar
dirkmc committed
81 82
	// Broadcast want-haves
	opBroadcast
83 84 85 86 87
)

type op struct {
	op   opType
	keys []cid.Cid
88 89
}

Jeromy's avatar
Jeromy committed
90 91
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
92
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
93
type Session struct {
94
	// dependencies
dirkmc's avatar
dirkmc committed
95 96 97 98 99 100 101
	ctx  context.Context
	wm   WantManager
	sprm SessionPeerManager
	sim  *bssim.SessionInterestManager

	sw  sessionWants
	sws sessionWantSender
102

dirkmc's avatar
dirkmc committed
103
	latencyTrkr latencyTracker
104

105
	// channels
106
	incoming      chan op
107 108 109
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
110 111 112 113 114 115
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
116
	// identifiers
Jeromy's avatar
Jeromy committed
117
	notif notifications.PubSub
118 119
	uuid  logging.Loggable
	id    uint64
dirkmc's avatar
dirkmc committed
120 121

	self peer.ID
Jeromy's avatar
Jeromy committed
122 123
}

124
// New creates a new bitswap session whose lifetime is bounded by the
125
// given context.
126 127 128
func New(ctx context.Context,
	id uint64,
	wm WantManager,
dirkmc's avatar
dirkmc committed
129 130
	sprm SessionPeerManager,
	sim *bssim.SessionInterestManager,
131
	pm PeerManager,
dirkmc's avatar
dirkmc committed
132
	bpm *bsbpm.BlockPresenceManager,
133
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
134
	initialSearchDelay time.Duration,
dirkmc's avatar
dirkmc committed
135 136
	periodicSearchDelay delay.D,
	self peer.ID) *Session {
Jeromy's avatar
Jeromy committed
137
	s := &Session{
dirkmc's avatar
dirkmc committed
138
		sw:                  newSessionWants(),
Steven Allen's avatar
Steven Allen committed
139 140 141
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
		wm:                  wm,
dirkmc's avatar
dirkmc committed
142 143 144 145
		sprm:                sprm,
		sim:                 sim,
		incoming:            make(chan op, 128),
		latencyTrkr:         latencyTracker{},
146
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
147 148 149 150 151
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
dirkmc's avatar
dirkmc committed
152
		self:                self,
Jeromy's avatar
Jeromy committed
153
	}
dirkmc's avatar
dirkmc committed
154
	s.sws = newSessionWantSender(ctx, id, pm, bpm, s.onWantsSent, s.onPeersExhausted)
Jeromy's avatar
Jeromy committed
155 156 157 158 159 160

	go s.run(ctx)

	return s
}

dirkmc's avatar
dirkmc committed
161 162 163 164
func (s *Session) ID() uint64 {
	return s.id
}

165
// ReceiveFrom receives incoming blocks from the given peer.
dirkmc's avatar
dirkmc committed
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
	interestedRes := s.sim.FilterSessionInterested(s.id, ks, haves, dontHaves)
	ks = interestedRes[0]
	haves = interestedRes[1]
	dontHaves = interestedRes[2]
	// s.logReceiveFrom(from, ks, haves, dontHaves)

	// Add any newly discovered peers that have blocks we're interested in to
	// the peer set
	isNewPeer := s.sprm.ReceiveFrom(from, ks, haves)

	// Record response timing only if the blocks came from the network
	// (blocks can also be received from the local node)
	if len(ks) > 0 && from != "" {
		s.sprm.RecordPeerResponse(from, ks)
	}

	// Update want potential
	s.sws.Update(from, ks, haves, dontHaves, isNewPeer)

	if len(ks) == 0 {
187 188 189
		return
	}

dirkmc's avatar
dirkmc committed
190 191 192 193 194 195 196 197 198 199 200 201 202
	// Record which blocks have been received and figure out the total latency
	// for fetching the blocks
	wanted, totalLatency := s.sw.BlocksReceived(ks)
	s.latencyTrkr.receiveUpdate(len(wanted), totalLatency)

	if len(wanted) == 0 {
		return
	}

	// Inform the SessionInterestManager that this session is no longer
	// expecting to receive the wanted keys
	s.sim.RemoveSessionWants(s.id, wanted)

203
	select {
dirkmc's avatar
dirkmc committed
204
	case s.incoming <- op{op: opReceive, keys: wanted}:
205 206 207 208
	case <-s.ctx.Done():
	}
}

dirkmc's avatar
dirkmc committed
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
// func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// 	// log.Infof("Ses%d<-%s: %d blocks, %d haves, %d dont haves\n",
// 	// 	s.id, from, len(interestedKs), len(wantedHaves), len(wantedDontHaves))
// 	for _, c := range interestedKs {
// 		log.Warningf("Ses%d %s<-%s: block %s\n", s.id, lu.P(s.self), lu.P(from), lu.C(c))
// 	}
// 	for _, c := range haves {
// 		log.Warningf("Ses%d %s<-%s: HAVE %s\n", s.id, lu.P(s.self), lu.P(from), lu.C(c))
// 	}
// 	for _, c := range dontHaves {
// 		log.Warningf("Ses%d %s<-%s: DONT_HAVE %s\n", s.id, lu.P(s.self), lu.P(from), lu.C(c))
// 	}
// }

func (s *Session) onWantsSent(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	allBlks := append(wantBlocks[:len(wantBlocks):len(wantBlocks)], wantHaves...)
	s.sw.WantsSent(allBlks)
	s.sprm.RecordPeerRequests([]peer.ID{p}, allBlks)
}

func (s *Session) onPeersExhausted(ks []cid.Cid) {
	select {
	case s.incoming <- op{op: opBroadcast, keys: ks}:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
234 235
}

236 237 238 239 240 241 242 243 244 245
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
246 247

	return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
248 249
		func(ctx context.Context, keys []cid.Cid) {
			select {
250
			case s.incoming <- op{op: opWant, keys: keys}:
251 252 253 254 255 256
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
257
			case s.incoming <- op{op: opCancel, keys: keys}:
258 259 260 261
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
262 263
}

264
// SetBaseTickDelay changes the rate at which ticks happen.
265 266 267 268 269
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
270 271
}

dirkmc's avatar
dirkmc committed
272 273
// Session run loop -- everything in this function should not be called
// outside of this loop
Jeromy's avatar
Jeromy committed
274
func (s *Session) run(ctx context.Context) {
dirkmc's avatar
dirkmc committed
275 276
	go s.sws.Run()

Steven Allen's avatar
Steven Allen committed
277 278
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
279 280
	for {
		select {
281 282 283
		case oper := <-s.incoming:
			switch oper.op {
			case opReceive:
dirkmc's avatar
dirkmc committed
284
				s.handleReceive(oper.keys)
285 286 287 288
			case opWant:
				s.wantBlocks(ctx, oper.keys)
			case opCancel:
				s.sw.CancelPending(oper.keys)
dirkmc's avatar
dirkmc committed
289 290
			case opBroadcast:
				s.handleIdleTick(ctx)
291 292 293
			default:
				panic("unhandled operation")
			}
Steven Allen's avatar
Steven Allen committed
294 295 296 297
		case <-s.idleTick.C:
			s.handleIdleTick(ctx)
		case <-s.periodicSearchTimer.C:
			s.handlePeriodicSearch(ctx)
298 299
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
300
		case <-ctx.Done():
301
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
302 303 304 305 306
			return
		}
	}
}

Steven Allen's avatar
Steven Allen committed
307
func (s *Session) handleIdleTick(ctx context.Context) {
308
	live := s.sw.PrepareBroadcast()
dirkmc's avatar
dirkmc committed
309 310 311
	// log.Warningf("\n\n\n\n\nSes%d: broadcast %d keys\n\n\n\n\n", s.id, len(live))
	// log.Infof("Ses%d: broadcast %d keys\n", s.id, len(live))
	log.Warningf("Ses%d: broadcast %d keys", s.id, len(live))
312

dirkmc's avatar
dirkmc committed
313 314 315
	// Broadcast a want-have for the live wants to everyone we're connected to
	s.sprm.RecordPeerRequests(nil, live)
	s.wm.BroadcastWantHaves(ctx, s.id, live)
316

dirkmc's avatar
dirkmc committed
317
	// do not find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
318
	// -- just rely on periodic search widening
319
	if len(live) > 0 && (s.consecutiveTicks == 0) {
dirkmc's avatar
dirkmc committed
320
		s.sprm.FindMorePeers(ctx, live[0])
321
	}
Steven Allen's avatar
Steven Allen committed
322
	s.resetIdleTick()
323

dirkmc's avatar
dirkmc committed
324
	// If we have live wants
325
	if s.sw.HasLiveWants() {
326 327
		s.consecutiveTicks++
	}
328 329
}

Steven Allen's avatar
Steven Allen committed
330
func (s *Session) handlePeriodicSearch(ctx context.Context) {
331
	randomWant := s.sw.RandomLiveWant()
332
	if !randomWant.Defined() {
333 334 335 336 337
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
dirkmc's avatar
dirkmc committed
338 339 340
	s.sprm.FindMorePeers(ctx, randomWant)

	s.wm.BroadcastWantHaves(ctx, s.id, []cid.Cid{randomWant})
341

Steven Allen's avatar
Steven Allen committed
342
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
343 344
}

345
func (s *Session) handleShutdown() {
Steven Allen's avatar
Steven Allen committed
346
	s.idleTick.Stop()
dirkmc's avatar
dirkmc committed
347
	s.wm.RemoveSession(s.ctx, s.id)
348 349
}

dirkmc's avatar
dirkmc committed
350
func (s *Session) handleReceive(ks []cid.Cid) {
351 352 353 354 355 356
	s.idleTick.Stop()

	// We've received new wanted blocks, so reset the number of ticks
	// that have occurred since the last new block
	s.consecutiveTicks = 0

dirkmc's avatar
dirkmc committed
357 358 359
	s.sprm.RecordCancels(ks)

	s.resetIdleTick()
360 361 362
}

func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
dirkmc's avatar
dirkmc committed
363 364 365 366
	if len(newks) > 0 {
		s.sim.RecordSessionInterest(s.id, newks)
		s.sw.BlocksRequested(newks)
		s.sws.Add(newks)
Jeromy's avatar
Jeromy committed
367
	}
368

dirkmc's avatar
dirkmc committed
369 370 371 372
	// If we have discovered peers already, the SessionPotentialManager will
	// send wants to them
	if s.sprm.Peers().Size() > 0 {
		return
373
	}
Jeromy's avatar
Jeromy committed
374

dirkmc's avatar
dirkmc committed
375 376 377 378 379 380 381
	// No peers discovered yet, broadcast some want-haves
	ks := s.sw.GetNextWants(broadcastLiveWantsLimit)
	if len(ks) > 0 {
		log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
		s.sprm.RecordPeerRequests(nil, ks)
		s.wm.BroadcastWantHaves(ctx, s.id, ks)
	}
Jeromy's avatar
Jeromy committed
382
}
383

Steven Allen's avatar
Steven Allen committed
384
func (s *Session) resetIdleTick() {
385
	var tickDelay time.Duration
dirkmc's avatar
dirkmc committed
386
	if !s.latencyTrkr.hasLatency() {
Steven Allen's avatar
Steven Allen committed
387
		tickDelay = s.initialSearchDelay
388
	} else {
dirkmc's avatar
dirkmc committed
389 390
		avLat := s.latencyTrkr.averageLatency()
		// log.Warningf("averageLatency %s", avLat)
391
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
392
	}
393
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
394
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
395
}
396

dirkmc's avatar
dirkmc committed
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
type latencyTracker struct {
	sync.RWMutex
	totalLatency time.Duration
	count        int
}

func (lt *latencyTracker) hasLatency() bool {
	lt.RLock()
	defer lt.RUnlock()

	return lt.totalLatency > 0 && lt.count > 0
}

func (lt *latencyTracker) averageLatency() time.Duration {
	lt.RLock()
	defer lt.RUnlock()

	return lt.totalLatency / time.Duration(lt.count)
}

func (lt *latencyTracker) receiveUpdate(count int, totalLatency time.Duration) {
	lt.Lock()
	defer lt.Unlock()

	lt.totalLatency += totalLatency
	lt.count += count
423
}