session.go 11.4 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4

import (
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6 7
	"time"

Jeromy's avatar
Jeromy committed
8
	lru "github.com/hashicorp/golang-lru"
9 10
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
Jeromy's avatar
Jeromy committed
11 12
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
13
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
14
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
15
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
16
	loggables "github.com/libp2p/go-libp2p-loggables"
17 18

	bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
Jeromy's avatar
Jeromy committed
19 20
)

21
const (
22 23
	broadcastLiveWantsLimit = 4
	targetedLiveWantsLimit  = 32
24
)
Jeromy's avatar
Jeromy committed
25

26
// WantManager is an interface that can be used to request blocks
27
// from given peers.
28
type WantManager interface {
29 30 31 32
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

33 34
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
35 36 37 38 39 40 41
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
	GetOptimizedPeers() []peer.ID
	RecordPeerRequests([]peer.ID, []cid.Cid)
	RecordPeerResponse(peer.ID, cid.Cid)
}

42 43 44 45 46 47 48 49
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
	SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
	RecordDuplicateBlock()
	RecordUniqueBlock()
}

50 51 52 53 54 55
type interestReq struct {
	c    cid.Cid
	resp chan bool
}

type blkRecv struct {
56 57 58
	from           peer.ID
	blk            blocks.Block
	counterMessage bool
59 60
}

Jeromy's avatar
Jeromy committed
61 62
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
63
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
64
type Session struct {
65
	// dependencies
66 67 68
	ctx context.Context
	wm  WantManager
	pm  PeerManager
69
	srs RequestSplitter
70 71 72 73 74 75 76 77 78 79

	// channels
	incoming      chan blkRecv
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	interestReqs  chan interestReq
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
80 81 82 83 84 85 86 87 88 89
	tofetch          *cidQueue
	interest         *lru.Cache
	pastWants        *cidQueue
	liveWants        map[cid.Cid]time.Time
	tick             *time.Timer
	rebroadcast      *time.Timer
	baseTickDelay    time.Duration
	latTotal         time.Duration
	fetchcnt         int
	consecutiveTicks int
90
	// identifiers
Jeromy's avatar
Jeromy committed
91
	notif notifications.PubSub
92 93
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
94 95
}

96
// New creates a new bitswap session whose lifetime is bounded by the
97
// given context.
98
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
Jeromy's avatar
Jeromy committed
99
	s := &Session{
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
		liveWants:     make(map[cid.Cid]time.Time),
		newReqs:       make(chan []cid.Cid),
		cancelKeys:    make(chan []cid.Cid),
		tofetch:       newCidQueue(),
		pastWants:     newCidQueue(),
		interestReqs:  make(chan interestReq),
		latencyReqs:   make(chan chan time.Duration),
		tickDelayReqs: make(chan time.Duration),
		ctx:           ctx,
		wm:            wm,
		pm:            pm,
		srs:           srs,
		incoming:      make(chan blkRecv),
		notif:         notifications.New(),
		uuid:          loggables.Uuid("GetBlockRequest"),
		baseTickDelay: time.Millisecond * 500,
		id:            id,
Jeromy's avatar
Jeromy committed
117 118 119 120 121 122 123 124 125 126
	}

	cache, _ := lru.New(2048)
	s.interest = cache

	go s.run(ctx)

	return s
}

127 128 129
// ReceiveBlockFrom receives an incoming block from the given peer.
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
	select {
130
	case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: false}:
131
	case <-s.ctx.Done():
132
	}
133 134 135
	ks := []cid.Cid{blk.Cid()}
	s.wm.CancelWants(s.ctx, ks, nil, s.id)

136
}
137

138 139 140 141 142 143 144 145 146
// UpdateReceiveCounters updates receive counters for a block,
// which may be a duplicate and adjusts the split factor based on that.
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
	select {
	case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
	case <-s.ctx.Done():
	}
}

147 148
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
	if s.interest.Contains(c) {
		return true
	}
	// TODO: PERF: this is using a channel to guard a map access against race
	// conditions. This is definitely much slower than a mutex, though its unclear
	// if it will actually induce any noticeable slowness. This is implemented this
	// way to avoid adding a more complex set of mutexes around the liveWants map.
	// note that in the average case (where this session *is* interested in the
	// block we received) this function will not be called, as the cid will likely
	// still be in the interest cache.
	resp := make(chan bool, 1)
	select {
	case s.interestReqs <- interestReq{
		c:    c,
		resp: resp,
	}:
	case <-s.ctx.Done():
		return false
	}

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
175 176
}

177 178 179 180 181 182 183 184 185 186
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
	return bsgetter.AsyncGetBlocks(ctx, keys, s.notif,
		func(ctx context.Context, keys []cid.Cid) {
			select {
			case s.newReqs <- keys:
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
			case s.cancelKeys <- keys:
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
202 203
}

204
// GetAverageLatency returns the average latency for block requests.
205 206
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
207
	select {
208 209 210 211 212 213 214 215
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
216
	case <-s.ctx.Done():
217
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
218
	}
Jeromy's avatar
Jeromy committed
219 220
}

221
// SetBaseTickDelay changes the rate at which ticks happen.
222 223 224 225 226
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
227 228
}

229
var provSearchDelay = time.Second
230
var rebroadcastDelay = delay.Fixed(time.Minute)
231 232 233 234 235

// SetProviderSearchDelay overwrites the global provider search delay
func SetProviderSearchDelay(newProvSearchDelay time.Duration) {
	provSearchDelay = newProvSearchDelay
}
236

237 238 239 240 241
// SetRebroadcastDelay overwrites the global provider rebroadcast delay
func SetRebroadcastDelay(newRebroadcastDelay delay.D) {
	rebroadcastDelay = newRebroadcastDelay
}

242 243
// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
244 245
func (s *Session) run(ctx context.Context) {
	s.tick = time.NewTimer(provSearchDelay)
246
	s.rebroadcast = time.NewTimer(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
247 248 249
	for {
		select {
		case blk := <-s.incoming:
250
			if blk.counterMessage {
251
				s.updateReceiveCounters(ctx, blk)
252 253 254
			} else {
				s.handleIncomingBlock(ctx, blk)
			}
Jeromy's avatar
Jeromy committed
255
		case keys := <-s.newReqs:
256
			s.handleNewRequest(ctx, keys)
Jeromy's avatar
Jeromy committed
257
		case keys := <-s.cancelKeys:
258
			s.handleCancel(keys)
Jeromy's avatar
Jeromy committed
259
		case <-s.tick.C:
260
			s.handleTick(ctx)
261 262
		case <-s.rebroadcast.C:
			s.handleRebroadcast(ctx)
Jeromy's avatar
Jeromy committed
263
		case lwchk := <-s.interestReqs:
264
			lwchk.resp <- s.cidIsWanted(lwchk.c)
265 266 267 268
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
269
		case <-ctx.Done():
270
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
271 272 273 274 275
			return
		}
	}
}

276 277 278 279
func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
	s.tick.Stop()

	if blk.from != "" {
280
		s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
281 282 283 284 285 286 287 288 289 290 291
	}

	s.receiveBlock(ctx, blk.blk)

	s.resetTick()
}

func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
	for _, k := range keys {
		s.interest.Add(k, nil)
	}
292
	if toadd := s.wantBudget(); toadd > 0 {
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
		if toadd > len(keys) {
			toadd = len(keys)
		}

		now := keys[:toadd]
		keys = keys[toadd:]

		s.wantBlocks(ctx, now)
	}
	for _, k := range keys {
		s.tofetch.Push(k)
	}
}

func (s *Session) handleCancel(keys []cid.Cid) {
	for _, c := range keys {
		s.tofetch.Remove(c)
	}
}

313
func (s *Session) handleTick(ctx context.Context) {
314

315 316 317 318 319 320 321 322
	live := make([]cid.Cid, 0, len(s.liveWants))
	now := time.Now()
	for c := range s.liveWants {
		live = append(live, c)
		s.liveWants[c] = now
	}

	// Broadcast these keys to everyone we're connected to
323
	s.pm.RecordPeerRequests(nil, live)
324 325
	s.wm.WantBlocks(ctx, live, nil, s.id)

326 327 328
	// do no find providers on consecutive ticks
	// -- just rely on periodic rebroadcast
	if len(live) > 0 && (s.consecutiveTicks == 0) {
329
		s.pm.FindMorePeers(ctx, live[0])
330 331
	}
	s.resetTick()
332 333 334 335

	if len(s.liveWants) > 0 {
		s.consecutiveTicks++
	}
336 337
}

338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
func (s *Session) handleRebroadcast(ctx context.Context) {

	if len(s.liveWants) == 0 {
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
	s.pm.FindMorePeers(ctx, s.randomLiveWant())

	s.rebroadcast.Reset(rebroadcastDelay.Get())
}

func (s *Session) randomLiveWant() cid.Cid {
	i := rand.Intn(len(s.liveWants))
	// picking a random live want
	for k := range s.liveWants {
		if i == 0 {
			return k
		}
		i--
	}
	return cid.Cid{}
}
362 363 364 365 366 367 368 369 370 371 372
func (s *Session) handleShutdown() {
	s.tick.Stop()
	s.notif.Shutdown()

	live := make([]cid.Cid, 0, len(s.liveWants))
	for c := range s.liveWants {
		live = append(live, c)
	}
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

373
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
374
	_, ok := s.liveWants[c]
375 376 377 378 379 380
	if !ok {
		ok = s.tofetch.Has(c)
	}
	return ok
}

Jeromy's avatar
Jeromy committed
381
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
382 383
	c := blk.Cid()
	if s.cidIsWanted(c) {
384
		s.srs.RecordUniqueBlock()
Steven Allen's avatar
Steven Allen committed
385
		tval, ok := s.liveWants[c]
386 387
		if ok {
			s.latTotal += time.Since(tval)
Steven Allen's avatar
Steven Allen committed
388
			delete(s.liveWants, c)
389 390 391
		} else {
			s.tofetch.Remove(c)
		}
Jeromy's avatar
Jeromy committed
392
		s.fetchcnt++
393 394
		// we've received new wanted blocks, so future ticks are not consecutive
		s.consecutiveTicks = 0
Jeromy's avatar
Jeromy committed
395 396
		s.notif.Publish(blk)

397 398 399 400 401 402 403 404 405 406
		toAdd := s.wantBudget()
		if toAdd > s.tofetch.Len() {
			toAdd = s.tofetch.Len()
		}
		if toAdd > 0 {
			var keys []cid.Cid
			for i := 0; i < toAdd; i++ {
				keys = append(keys, s.tofetch.Pop())
			}
			s.wantBlocks(ctx, keys)
Jeromy's avatar
Jeromy committed
407
		}
408 409 410 411 412

		s.pastWants.Push(c)
	}
}

413 414 415 416
func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
	ks := blk.blk.Cid()
	if s.pastWants.Has(ks) {
		s.srs.RecordDuplicateBlock()
Jeromy's avatar
Jeromy committed
417 418 419
	}
}

420
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
421
	now := time.Now()
Jeromy's avatar
Jeromy committed
422
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
423
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
424
	}
425
	peers := s.pm.GetOptimizedPeers()
426
	if len(peers) > 0 {
427 428 429 430
		splitRequests := s.srs.SplitRequest(peers, ks)
		for _, splitRequest := range splitRequests {
			s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
			s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
431 432 433 434 435
		}
	} else {
		s.pm.RecordPeerRequests(nil, ks)
		s.wm.WantBlocks(ctx, ks, nil, s.id)
	}
Jeromy's avatar
Jeromy committed
436 437
}

438 439
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
440
}
441

442
func (s *Session) resetTick() {
443
	var tickDelay time.Duration
444
	if s.latTotal == 0 {
445
		tickDelay = provSearchDelay
446 447
	} else {
		avLat := s.averageLatency()
448
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
449
	}
450 451
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
	s.tick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
452
}
453 454 455 456 457 458 459 460 461 462 463 464 465 466

func (s *Session) wantBudget() int {
	live := len(s.liveWants)
	var budget int
	if len(s.pm.GetOptimizedPeers()) > 0 {
		budget = targetedLiveWantsLimit - live
	} else {
		budget = broadcastLiveWantsLimit - live
	}
	if budget < 0 {
		budget = 0
	}
	return budget
}