session.go 11.5 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4

import (
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6 7
	"time"

Jeromy's avatar
Jeromy committed
8
	lru "github.com/hashicorp/golang-lru"
9 10
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
Jeromy's avatar
Jeromy committed
11 12
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
13
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
14
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
15
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
16
	loggables "github.com/libp2p/go-libp2p-loggables"
17 18

	bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
Jeromy's avatar
Jeromy committed
19 20
)

21
const (
22 23
	broadcastLiveWantsLimit = 4
	targetedLiveWantsLimit  = 32
24
)
Jeromy's avatar
Jeromy committed
25

26
// WantManager is an interface that can be used to request blocks
27
// from given peers.
28
type WantManager interface {
29 30 31 32
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

33 34
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
35 36 37 38 39 40 41
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
	GetOptimizedPeers() []peer.ID
	RecordPeerRequests([]peer.ID, []cid.Cid)
	RecordPeerResponse(peer.ID, cid.Cid)
}

42 43 44 45 46 47 48 49
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
	SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
	RecordDuplicateBlock()
	RecordUniqueBlock()
}

50 51 52 53 54 55
type interestReq struct {
	c    cid.Cid
	resp chan bool
}

type blkRecv struct {
56 57 58
	from           peer.ID
	blk            blocks.Block
	counterMessage bool
59 60
}

Jeromy's avatar
Jeromy committed
61 62
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
63
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
64
type Session struct {
65
	// dependencies
66 67 68
	ctx context.Context
	wm  WantManager
	pm  PeerManager
69
	srs RequestSplitter
70 71 72 73 74 75 76 77 78 79

	// channels
	incoming      chan blkRecv
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	interestReqs  chan interestReq
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
80 81 82 83 84 85 86 87 88 89 90
	tofetch          *cidQueue
	interest         *lru.Cache
	pastWants        *cidQueue
	liveWants        map[cid.Cid]time.Time
	tick             *time.Timer
	rebroadcast      *time.Timer
	baseTickDelay    time.Duration
	latTotal         time.Duration
	fetchcnt         int
	consecutiveTicks int
	lastFetchCount   int
91
	// identifiers
Jeromy's avatar
Jeromy committed
92
	notif notifications.PubSub
93 94
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
95 96
}

97
// New creates a new bitswap session whose lifetime is bounded by the
98
// given context.
99
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
Jeromy's avatar
Jeromy committed
100
	s := &Session{
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
		liveWants:      make(map[cid.Cid]time.Time),
		newReqs:        make(chan []cid.Cid),
		cancelKeys:     make(chan []cid.Cid),
		tofetch:        newCidQueue(),
		pastWants:      newCidQueue(),
		interestReqs:   make(chan interestReq),
		latencyReqs:    make(chan chan time.Duration),
		tickDelayReqs:  make(chan time.Duration),
		ctx:            ctx,
		wm:             wm,
		pm:             pm,
		srs:            srs,
		incoming:       make(chan blkRecv),
		notif:          notifications.New(),
		uuid:           loggables.Uuid("GetBlockRequest"),
		baseTickDelay:  time.Millisecond * 500,
		lastFetchCount: -1,
		id:             id,
Jeromy's avatar
Jeromy committed
119 120 121 122 123 124 125 126 127 128
	}

	cache, _ := lru.New(2048)
	s.interest = cache

	go s.run(ctx)

	return s
}

129 130 131
// ReceiveBlockFrom receives an incoming block from the given peer.
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
	select {
132
	case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: false}:
133
	case <-s.ctx.Done():
134
	}
135 136 137
	ks := []cid.Cid{blk.Cid()}
	s.wm.CancelWants(s.ctx, ks, nil, s.id)

138
}
139

140 141 142 143 144 145 146 147 148
// UpdateReceiveCounters updates receive counters for a block,
// which may be a duplicate and adjusts the split factor based on that.
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
	select {
	case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
	case <-s.ctx.Done():
	}
}

149 150
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
	if s.interest.Contains(c) {
		return true
	}
	// TODO: PERF: this is using a channel to guard a map access against race
	// conditions. This is definitely much slower than a mutex, though its unclear
	// if it will actually induce any noticeable slowness. This is implemented this
	// way to avoid adding a more complex set of mutexes around the liveWants map.
	// note that in the average case (where this session *is* interested in the
	// block we received) this function will not be called, as the cid will likely
	// still be in the interest cache.
	resp := make(chan bool, 1)
	select {
	case s.interestReqs <- interestReq{
		c:    c,
		resp: resp,
	}:
	case <-s.ctx.Done():
		return false
	}

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
177 178
}

179 180 181 182 183 184 185 186 187 188
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
	return bsgetter.AsyncGetBlocks(ctx, keys, s.notif,
		func(ctx context.Context, keys []cid.Cid) {
			select {
			case s.newReqs <- keys:
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
			case s.cancelKeys <- keys:
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
204 205
}

206
// GetAverageLatency returns the average latency for block requests.
207 208
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
209
	select {
210 211 212 213 214 215 216 217
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
218
	case <-s.ctx.Done():
219
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
220
	}
Jeromy's avatar
Jeromy committed
221 222
}

223
// SetBaseTickDelay changes the rate at which ticks happen.
224 225 226 227 228
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
229 230
}

231
var provSearchDelay = time.Second
232
var rebroadcastDelay = delay.Fixed(time.Minute)
233 234 235 236 237

// SetProviderSearchDelay overwrites the global provider search delay
func SetProviderSearchDelay(newProvSearchDelay time.Duration) {
	provSearchDelay = newProvSearchDelay
}
238

239 240 241 242 243
// SetRebroadcastDelay overwrites the global provider rebroadcast delay
func SetRebroadcastDelay(newRebroadcastDelay delay.D) {
	rebroadcastDelay = newRebroadcastDelay
}

244 245
// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
246 247
func (s *Session) run(ctx context.Context) {
	s.tick = time.NewTimer(provSearchDelay)
248
	s.rebroadcast = time.NewTimer(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
249 250 251
	for {
		select {
		case blk := <-s.incoming:
252
			if blk.counterMessage {
253
				s.updateReceiveCounters(ctx, blk)
254 255 256
			} else {
				s.handleIncomingBlock(ctx, blk)
			}
Jeromy's avatar
Jeromy committed
257
		case keys := <-s.newReqs:
258
			s.handleNewRequest(ctx, keys)
Jeromy's avatar
Jeromy committed
259
		case keys := <-s.cancelKeys:
260
			s.handleCancel(keys)
Jeromy's avatar
Jeromy committed
261
		case <-s.tick.C:
262
			s.handleTick(ctx)
263 264
		case <-s.rebroadcast.C:
			s.handleRebroadcast(ctx)
Jeromy's avatar
Jeromy committed
265
		case lwchk := <-s.interestReqs:
266
			lwchk.resp <- s.cidIsWanted(lwchk.c)
267 268 269 270
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
271
		case <-ctx.Done():
272
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
273 274 275 276 277
			return
		}
	}
}

278 279 280 281
func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
	s.tick.Stop()

	if blk.from != "" {
282
		s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
283 284 285 286 287 288 289 290 291 292 293
	}

	s.receiveBlock(ctx, blk.blk)

	s.resetTick()
}

func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
	for _, k := range keys {
		s.interest.Add(k, nil)
	}
294
	if toadd := s.wantBudget(); toadd > 0 {
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
		if toadd > len(keys) {
			toadd = len(keys)
		}

		now := keys[:toadd]
		keys = keys[toadd:]

		s.wantBlocks(ctx, now)
	}
	for _, k := range keys {
		s.tofetch.Push(k)
	}
}

func (s *Session) handleCancel(keys []cid.Cid) {
	for _, c := range keys {
		s.tofetch.Remove(c)
	}
}

315
func (s *Session) handleTick(ctx context.Context) {
316

317 318 319 320 321 322
	if len(s.liveWants) > 0 {
		if s.fetchcnt == s.lastFetchCount {
			s.consecutiveTicks++
		} else {
			s.lastFetchCount = s.fetchcnt
		}
323 324
	}

325 326 327 328 329 330 331 332
	live := make([]cid.Cid, 0, len(s.liveWants))
	now := time.Now()
	for c := range s.liveWants {
		live = append(live, c)
		s.liveWants[c] = now
	}

	// Broadcast these keys to everyone we're connected to
333
	s.pm.RecordPeerRequests(nil, live)
334 335
	s.wm.WantBlocks(ctx, live, nil, s.id)

336 337 338
	// do no find providers on consecutive ticks
	// -- just rely on periodic rebroadcast
	if len(live) > 0 && (s.consecutiveTicks == 0) {
339
		s.pm.FindMorePeers(ctx, live[0])
340 341 342 343
	}
	s.resetTick()
}

344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
func (s *Session) handleRebroadcast(ctx context.Context) {

	if len(s.liveWants) == 0 {
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
	s.pm.FindMorePeers(ctx, s.randomLiveWant())

	s.rebroadcast.Reset(rebroadcastDelay.Get())
}

func (s *Session) randomLiveWant() cid.Cid {
	i := rand.Intn(len(s.liveWants))
	// picking a random live want
	for k := range s.liveWants {
		if i == 0 {
			return k
		}
		i--
	}
	return cid.Cid{}
}
368 369 370 371 372 373 374 375 376 377 378
func (s *Session) handleShutdown() {
	s.tick.Stop()
	s.notif.Shutdown()

	live := make([]cid.Cid, 0, len(s.liveWants))
	for c := range s.liveWants {
		live = append(live, c)
	}
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

379
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
380
	_, ok := s.liveWants[c]
381 382 383 384 385 386
	if !ok {
		ok = s.tofetch.Has(c)
	}
	return ok
}

Jeromy's avatar
Jeromy committed
387
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
388 389
	c := blk.Cid()
	if s.cidIsWanted(c) {
390
		s.srs.RecordUniqueBlock()
Steven Allen's avatar
Steven Allen committed
391
		tval, ok := s.liveWants[c]
392 393
		if ok {
			s.latTotal += time.Since(tval)
Steven Allen's avatar
Steven Allen committed
394
			delete(s.liveWants, c)
395 396 397
		} else {
			s.tofetch.Remove(c)
		}
Jeromy's avatar
Jeromy committed
398
		s.fetchcnt++
399 400
		// we've received new wanted blocks, so future ticks are not consecutive
		s.consecutiveTicks = 0
Jeromy's avatar
Jeromy committed
401 402
		s.notif.Publish(blk)

403 404 405 406 407 408 409 410 411 412
		toAdd := s.wantBudget()
		if toAdd > s.tofetch.Len() {
			toAdd = s.tofetch.Len()
		}
		if toAdd > 0 {
			var keys []cid.Cid
			for i := 0; i < toAdd; i++ {
				keys = append(keys, s.tofetch.Pop())
			}
			s.wantBlocks(ctx, keys)
Jeromy's avatar
Jeromy committed
413
		}
414 415 416 417 418

		s.pastWants.Push(c)
	}
}

419 420 421 422
func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
	ks := blk.blk.Cid()
	if s.pastWants.Has(ks) {
		s.srs.RecordDuplicateBlock()
Jeromy's avatar
Jeromy committed
423 424 425
	}
}

426
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
427
	now := time.Now()
Jeromy's avatar
Jeromy committed
428
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
429
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
430
	}
431
	peers := s.pm.GetOptimizedPeers()
432
	if len(peers) > 0 {
433 434 435 436
		splitRequests := s.srs.SplitRequest(peers, ks)
		for _, splitRequest := range splitRequests {
			s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
			s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
437 438 439 440 441
		}
	} else {
		s.pm.RecordPeerRequests(nil, ks)
		s.wm.WantBlocks(ctx, ks, nil, s.id)
	}
Jeromy's avatar
Jeromy committed
442 443
}

444 445
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
446
}
447

448
func (s *Session) resetTick() {
449
	var tickDelay time.Duration
450
	if s.latTotal == 0 {
451
		tickDelay = provSearchDelay
452 453
	} else {
		avLat := s.averageLatency()
454
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
455
	}
456 457
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
	s.tick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
458
}
459 460 461 462 463 464 465 466 467 468 469 470 471 472

func (s *Session) wantBudget() int {
	live := len(s.liveWants)
	var budget int
	if len(s.pm.GetOptimizedPeers()) > 0 {
		budget = targetedLiveWantsLimit - live
	} else {
		budget = broadcastLiveWantsLimit - live
	}
	if budget < 0 {
		budget = 0
	}
	return budget
}