session.go 11.6 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4

import (
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6 7
	"time"

Jeromy's avatar
Jeromy committed
8
	lru "github.com/hashicorp/golang-lru"
9 10
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
11
	bssd "github.com/ipfs/go-bitswap/sessiondata"
Jeromy's avatar
Jeromy committed
12 13
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
14
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
15
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
16
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
17
	loggables "github.com/libp2p/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
18 19
)

20
const (
21 22
	broadcastLiveWantsLimit = 4
	targetedLiveWantsLimit  = 32
23
)
Jeromy's avatar
Jeromy committed
24

25
// WantManager is an interface that can be used to request blocks
26
// from given peers.
27
type WantManager interface {
28 29 30 31
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

32 33
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
34 35
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
36
	GetOptimizedPeers() []bssd.OptimizedPeer
37 38
	RecordPeerRequests([]peer.ID, []cid.Cid)
	RecordPeerResponse(peer.ID, cid.Cid)
39
	RecordCancel(cid.Cid)
40 41
}

42 43 44
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
45
	SplitRequest([]bssd.OptimizedPeer, []cid.Cid) []bssd.PartialRequest
46 47 48 49
	RecordDuplicateBlock()
	RecordUniqueBlock()
}

50 51 52 53 54 55
type interestReq struct {
	c    cid.Cid
	resp chan bool
}

type blkRecv struct {
56 57 58
	from           peer.ID
	blk            blocks.Block
	counterMessage bool
59 60
}

Jeromy's avatar
Jeromy committed
61 62
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
63
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
64
type Session struct {
65
	// dependencies
66 67 68
	ctx context.Context
	wm  WantManager
	pm  PeerManager
69
	srs RequestSplitter
70 71 72 73 74 75 76 77 78 79

	// channels
	incoming      chan blkRecv
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	interestReqs  chan interestReq
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
80 81 82 83 84 85 86 87 88 89 90 91
	tofetch             *cidQueue
	interest            *lru.Cache
	pastWants           *cidQueue
	liveWants           map[cid.Cid]time.Time
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	latTotal            time.Duration
	fetchcnt            int
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
92
	// identifiers
Jeromy's avatar
Jeromy committed
93
	notif notifications.PubSub
94 95
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
96 97
}

98
// New creates a new bitswap session whose lifetime is bounded by the
99
// given context.
100 101 102 103 104
func New(ctx context.Context,
	id uint64,
	wm WantManager,
	pm PeerManager,
	srs RequestSplitter,
Steven Allen's avatar
Steven Allen committed
105 106
	initialSearchDelay time.Duration,
	periodicSearchDelay delay.D) *Session {
Jeromy's avatar
Jeromy committed
107
	s := &Session{
Steven Allen's avatar
Steven Allen committed
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
		liveWants:           make(map[cid.Cid]time.Time),
		newReqs:             make(chan []cid.Cid),
		cancelKeys:          make(chan []cid.Cid),
		tofetch:             newCidQueue(),
		pastWants:           newCidQueue(),
		interestReqs:        make(chan interestReq),
		latencyReqs:         make(chan chan time.Duration),
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
		wm:                  wm,
		pm:                  pm,
		srs:                 srs,
		incoming:            make(chan blkRecv),
		notif:               notifications.New(),
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
Jeromy's avatar
Jeromy committed
127 128 129 130 131 132 133 134 135 136
	}

	cache, _ := lru.New(2048)
	s.interest = cache

	go s.run(ctx)

	return s
}

137 138 139
// ReceiveBlockFrom receives an incoming block from the given peer.
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
	select {
140
	case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: false}:
141
	case <-s.ctx.Done():
142
	}
143
	ks := []cid.Cid{blk.Cid()}
144
	s.pm.RecordCancel(blk.Cid())
145
	s.wm.CancelWants(s.ctx, ks, nil, s.id)
146
}
147

148 149 150 151 152 153 154 155 156
// UpdateReceiveCounters updates receive counters for a block,
// which may be a duplicate and adjusts the split factor based on that.
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
	select {
	case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
	case <-s.ctx.Done():
	}
}

157 158
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
	if s.interest.Contains(c) {
		return true
	}
	// TODO: PERF: this is using a channel to guard a map access against race
	// conditions. This is definitely much slower than a mutex, though its unclear
	// if it will actually induce any noticeable slowness. This is implemented this
	// way to avoid adding a more complex set of mutexes around the liveWants map.
	// note that in the average case (where this session *is* interested in the
	// block we received) this function will not be called, as the cid will likely
	// still be in the interest cache.
	resp := make(chan bool, 1)
	select {
	case s.interestReqs <- interestReq{
		c:    c,
		resp: resp,
	}:
	case <-s.ctx.Done():
		return false
	}

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
185 186
}

187 188 189 190 191 192 193 194 195 196
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
	return bsgetter.AsyncGetBlocks(ctx, keys, s.notif,
		func(ctx context.Context, keys []cid.Cid) {
			select {
			case s.newReqs <- keys:
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
			case s.cancelKeys <- keys:
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
212 213
}

214
// GetAverageLatency returns the average latency for block requests.
215 216
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
217
	select {
218 219 220 221 222 223 224 225
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
226
	case <-s.ctx.Done():
227
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
228
	}
Jeromy's avatar
Jeromy committed
229 230
}

231
// SetBaseTickDelay changes the rate at which ticks happen.
232 233 234 235 236
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
237 238
}

239 240
// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
241
func (s *Session) run(ctx context.Context) {
Steven Allen's avatar
Steven Allen committed
242 243
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
244 245 246
	for {
		select {
		case blk := <-s.incoming:
247
			if blk.counterMessage {
248
				s.updateReceiveCounters(ctx, blk)
249 250 251
			} else {
				s.handleIncomingBlock(ctx, blk)
			}
Jeromy's avatar
Jeromy committed
252
		case keys := <-s.newReqs:
253
			s.handleNewRequest(ctx, keys)
Jeromy's avatar
Jeromy committed
254
		case keys := <-s.cancelKeys:
255
			s.handleCancel(keys)
Steven Allen's avatar
Steven Allen committed
256 257 258 259
		case <-s.idleTick.C:
			s.handleIdleTick(ctx)
		case <-s.periodicSearchTimer.C:
			s.handlePeriodicSearch(ctx)
Jeromy's avatar
Jeromy committed
260
		case lwchk := <-s.interestReqs:
261
			lwchk.resp <- s.cidIsWanted(lwchk.c)
262 263 264 265
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
266
		case <-ctx.Done():
267
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
268 269 270 271 272
			return
		}
	}
}

273
func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
Steven Allen's avatar
Steven Allen committed
274
	s.idleTick.Stop()
275 276

	if blk.from != "" {
277
		s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
278 279 280 281
	}

	s.receiveBlock(ctx, blk.blk)

Steven Allen's avatar
Steven Allen committed
282
	s.resetIdleTick()
283 284 285 286 287 288
}

func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
	for _, k := range keys {
		s.interest.Add(k, nil)
	}
289
	if toadd := s.wantBudget(); toadd > 0 {
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
		if toadd > len(keys) {
			toadd = len(keys)
		}

		now := keys[:toadd]
		keys = keys[toadd:]

		s.wantBlocks(ctx, now)
	}
	for _, k := range keys {
		s.tofetch.Push(k)
	}
}

func (s *Session) handleCancel(keys []cid.Cid) {
	for _, c := range keys {
		s.tofetch.Remove(c)
	}
}

Steven Allen's avatar
Steven Allen committed
310
func (s *Session) handleIdleTick(ctx context.Context) {
311

312 313 314 315 316 317 318 319
	live := make([]cid.Cid, 0, len(s.liveWants))
	now := time.Now()
	for c := range s.liveWants {
		live = append(live, c)
		s.liveWants[c] = now
	}

	// Broadcast these keys to everyone we're connected to
320
	s.pm.RecordPeerRequests(nil, live)
321 322
	s.wm.WantBlocks(ctx, live, nil, s.id)

323
	// do no find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
324
	// -- just rely on periodic search widening
325
	if len(live) > 0 && (s.consecutiveTicks == 0) {
326
		s.pm.FindMorePeers(ctx, live[0])
327
	}
Steven Allen's avatar
Steven Allen committed
328
	s.resetIdleTick()
329 330 331 332

	if len(s.liveWants) > 0 {
		s.consecutiveTicks++
	}
333 334
}

Steven Allen's avatar
Steven Allen committed
335
func (s *Session) handlePeriodicSearch(ctx context.Context) {
336 337
	randomWant := s.randomLiveWant()
	if !randomWant.Defined() {
338 339 340 341 342
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
343 344
	s.pm.FindMorePeers(ctx, randomWant)
	s.wm.WantBlocks(ctx, []cid.Cid{randomWant}, nil, s.id)
345

Steven Allen's avatar
Steven Allen committed
346
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
347 348 349
}

func (s *Session) randomLiveWant() cid.Cid {
Steven Allen's avatar
Steven Allen committed
350 351 352
	if len(s.liveWants) == 0 {
		return cid.Cid{}
	}
353 354 355 356 357 358 359 360 361 362
	i := rand.Intn(len(s.liveWants))
	// picking a random live want
	for k := range s.liveWants {
		if i == 0 {
			return k
		}
		i--
	}
	return cid.Cid{}
}
363
func (s *Session) handleShutdown() {
Steven Allen's avatar
Steven Allen committed
364
	s.idleTick.Stop()
365 366 367 368 369 370 371 372 373
	s.notif.Shutdown()

	live := make([]cid.Cid, 0, len(s.liveWants))
	for c := range s.liveWants {
		live = append(live, c)
	}
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

374
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
375
	_, ok := s.liveWants[c]
376 377 378 379 380 381
	if !ok {
		ok = s.tofetch.Has(c)
	}
	return ok
}

Jeromy's avatar
Jeromy committed
382
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
383 384
	c := blk.Cid()
	if s.cidIsWanted(c) {
385
		s.srs.RecordUniqueBlock()
Steven Allen's avatar
Steven Allen committed
386
		tval, ok := s.liveWants[c]
387 388
		if ok {
			s.latTotal += time.Since(tval)
Steven Allen's avatar
Steven Allen committed
389
			delete(s.liveWants, c)
390 391 392
		} else {
			s.tofetch.Remove(c)
		}
Jeromy's avatar
Jeromy committed
393
		s.fetchcnt++
394 395
		// we've received new wanted blocks, so future ticks are not consecutive
		s.consecutiveTicks = 0
Jeromy's avatar
Jeromy committed
396 397
		s.notif.Publish(blk)

398 399 400 401 402 403 404 405 406 407
		toAdd := s.wantBudget()
		if toAdd > s.tofetch.Len() {
			toAdd = s.tofetch.Len()
		}
		if toAdd > 0 {
			var keys []cid.Cid
			for i := 0; i < toAdd; i++ {
				keys = append(keys, s.tofetch.Pop())
			}
			s.wantBlocks(ctx, keys)
Jeromy's avatar
Jeromy committed
408
		}
409 410 411 412 413

		s.pastWants.Push(c)
	}
}

414 415 416 417
func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
	ks := blk.blk.Cid()
	if s.pastWants.Has(ks) {
		s.srs.RecordDuplicateBlock()
Jeromy's avatar
Jeromy committed
418 419 420
	}
}

421
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
422
	now := time.Now()
Jeromy's avatar
Jeromy committed
423
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
424
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
425
	}
426
	peers := s.pm.GetOptimizedPeers()
427
	if len(peers) > 0 {
428 429 430 431
		splitRequests := s.srs.SplitRequest(peers, ks)
		for _, splitRequest := range splitRequests {
			s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
			s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
432 433 434 435 436
		}
	} else {
		s.pm.RecordPeerRequests(nil, ks)
		s.wm.WantBlocks(ctx, ks, nil, s.id)
	}
Jeromy's avatar
Jeromy committed
437 438
}

439 440
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
441
}
442

Steven Allen's avatar
Steven Allen committed
443
func (s *Session) resetIdleTick() {
444
	var tickDelay time.Duration
445
	if s.latTotal == 0 {
Steven Allen's avatar
Steven Allen committed
446
		tickDelay = s.initialSearchDelay
447 448
	} else {
		avLat := s.averageLatency()
449
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
450
	}
451
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
452
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
453
}
454 455 456 457 458 459 460 461 462 463 464 465 466 467

func (s *Session) wantBudget() int {
	live := len(s.liveWants)
	var budget int
	if len(s.pm.GetOptimizedPeers()) > 0 {
		budget = targetedLiveWantsLimit - live
	} else {
		budget = broadcastLiveWantsLimit - live
	}
	if budget < 0 {
		budget = 0
	}
	return budget
}