session.go 12.1 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4

import (
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6 7
	"time"

Jeromy's avatar
Jeromy committed
8
	lru "github.com/hashicorp/golang-lru"
9 10
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
11
	bssd "github.com/ipfs/go-bitswap/sessiondata"
Jeromy's avatar
Jeromy committed
12 13
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
14
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
15
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
16
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
17
	loggables "github.com/libp2p/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
18 19
)

20
const (
21 22
	broadcastLiveWantsLimit = 4
	targetedLiveWantsLimit  = 32
23
)
Jeromy's avatar
Jeromy committed
24

25
// WantManager is an interface that can be used to request blocks
26
// from given peers.
27
type WantManager interface {
28 29 30 31
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

32 33
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
34 35
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
36
	GetOptimizedPeers() []bssd.OptimizedPeer
37
	RecordPeerRequests([]peer.ID, []cid.Cid)
38 39
	RecordPeerResponse(peer.ID, []cid.Cid)
	RecordCancels([]cid.Cid)
40 41
}

42 43 44
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
45
	SplitRequest([]bssd.OptimizedPeer, []cid.Cid) []bssd.PartialRequest
46 47 48 49
	RecordDuplicateBlock()
	RecordUniqueBlock()
}

50 51 52 53 54
type interestReq struct {
	c    cid.Cid
	resp chan bool
}

55
type rcvFrom struct {
56
	from peer.ID
57
	ks   []cid.Cid
58 59
}

Jeromy's avatar
Jeromy committed
60 61
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
62
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
63
type Session struct {
64
	// dependencies
65 66 67
	ctx context.Context
	wm  WantManager
	pm  PeerManager
68
	srs RequestSplitter
69 70

	// channels
71
	incoming      chan rcvFrom
72 73 74 75 76 77 78
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	interestReqs  chan interestReq
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
79 80 81 82 83 84 85 86 87 88 89 90
	tofetch             *cidQueue
	interest            *lru.Cache
	pastWants           *cidQueue
	liveWants           map[cid.Cid]time.Time
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	latTotal            time.Duration
	fetchcnt            int
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
91
	// identifiers
Jeromy's avatar
Jeromy committed
92
	notif notifications.PubSub
93 94
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
95 96
}

97
// New creates a new bitswap session whose lifetime is bounded by the
98
// given context.
99 100 101 102 103
func New(ctx context.Context,
	id uint64,
	wm WantManager,
	pm PeerManager,
	srs RequestSplitter,
104
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
105 106
	initialSearchDelay time.Duration,
	periodicSearchDelay delay.D) *Session {
Jeromy's avatar
Jeromy committed
107
	s := &Session{
Steven Allen's avatar
Steven Allen committed
108 109 110 111 112 113 114 115 116 117 118 119
		liveWants:           make(map[cid.Cid]time.Time),
		newReqs:             make(chan []cid.Cid),
		cancelKeys:          make(chan []cid.Cid),
		tofetch:             newCidQueue(),
		pastWants:           newCidQueue(),
		interestReqs:        make(chan interestReq),
		latencyReqs:         make(chan chan time.Duration),
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
		wm:                  wm,
		pm:                  pm,
		srs:                 srs,
120
		incoming:            make(chan rcvFrom),
121
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
122 123 124 125 126
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
Jeromy's avatar
Jeromy committed
127 128 129 130 131 132 133 134 135 136
	}

	cache, _ := lru.New(2048)
	s.interest = cache

	go s.run(ctx)

	return s
}

137 138
// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) {
139
	select {
140
	case s.incoming <- rcvFrom{from: from, ks: ks}:
141 142 143 144
	case <-s.ctx.Done():
	}
}

145 146
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
	if s.interest.Contains(c) {
		return true
	}
	// TODO: PERF: this is using a channel to guard a map access against race
	// conditions. This is definitely much slower than a mutex, though its unclear
	// if it will actually induce any noticeable slowness. This is implemented this
	// way to avoid adding a more complex set of mutexes around the liveWants map.
	// note that in the average case (where this session *is* interested in the
	// block we received) this function will not be called, as the cid will likely
	// still be in the interest cache.
	resp := make(chan bool, 1)
	select {
	case s.interestReqs <- interestReq{
		c:    c,
		resp: resp,
	}:
	case <-s.ctx.Done():
		return false
	}

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
173 174
}

175 176 177 178 179 180 181 182 183 184
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
185 186

	return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
187 188 189 190 191 192 193 194 195 196 197 198 199 200
		func(ctx context.Context, keys []cid.Cid) {
			select {
			case s.newReqs <- keys:
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
			case s.cancelKeys <- keys:
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
201 202
}

203
// GetAverageLatency returns the average latency for block requests.
204 205
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
206
	select {
207 208 209 210 211 212 213 214
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
215
	case <-s.ctx.Done():
216
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
217
	}
Jeromy's avatar
Jeromy committed
218 219
}

220
// SetBaseTickDelay changes the rate at which ticks happen.
221 222 223 224 225
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
226 227
}

228 229
// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
230
func (s *Session) run(ctx context.Context) {
Steven Allen's avatar
Steven Allen committed
231 232
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
233 234
	for {
		select {
235
		case rcv := <-s.incoming:
236
			s.cancelIncoming(ctx, rcv)
237 238 239 240
			// Record statistics only if the blocks came from the network
			// (blocks can also be received from the local node)
			if rcv.from != "" {
				s.updateReceiveCounters(ctx, rcv)
241
			}
242
			s.handleIncoming(ctx, rcv)
Jeromy's avatar
Jeromy committed
243
		case keys := <-s.newReqs:
244
			s.handleNewRequest(ctx, keys)
Jeromy's avatar
Jeromy committed
245
		case keys := <-s.cancelKeys:
246
			s.handleCancel(keys)
Steven Allen's avatar
Steven Allen committed
247 248 249 250
		case <-s.idleTick.C:
			s.handleIdleTick(ctx)
		case <-s.periodicSearchTimer.C:
			s.handlePeriodicSearch(ctx)
Jeromy's avatar
Jeromy committed
251
		case lwchk := <-s.interestReqs:
252
			lwchk.resp <- s.cidIsWanted(lwchk.c)
253 254 255 256
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
257
		case <-ctx.Done():
258
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
259 260 261 262 263
			return
		}
	}
}

264
func (s *Session) cancelIncoming(ctx context.Context, rcv rcvFrom) {
265
	// We've received the blocks so we can cancel any outstanding wants for them
266 267 268 269
	wanted := make([]cid.Cid, 0, len(rcv.ks))
	for _, k := range rcv.ks {
		if s.cidIsWanted(k) {
			wanted = append(wanted, k)
270
		}
271
	}
272 273
	s.pm.RecordCancels(wanted)
	s.wm.CancelWants(s.ctx, wanted, nil, s.id)
274 275
}

276
func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
277
	s.idleTick.Stop()
278

279
	// Process the received blocks
280
	s.processIncoming(ctx, rcv.ks)
281

Steven Allen's avatar
Steven Allen committed
282
	s.resetIdleTick()
283 284 285 286 287 288
}

func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
	for _, k := range keys {
		s.interest.Add(k, nil)
	}
289
	if toadd := s.wantBudget(); toadd > 0 {
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
		if toadd > len(keys) {
			toadd = len(keys)
		}

		now := keys[:toadd]
		keys = keys[toadd:]

		s.wantBlocks(ctx, now)
	}
	for _, k := range keys {
		s.tofetch.Push(k)
	}
}

func (s *Session) handleCancel(keys []cid.Cid) {
	for _, c := range keys {
		s.tofetch.Remove(c)
	}
}

Steven Allen's avatar
Steven Allen committed
310
func (s *Session) handleIdleTick(ctx context.Context) {
311 312 313 314 315 316 317 318
	live := make([]cid.Cid, 0, len(s.liveWants))
	now := time.Now()
	for c := range s.liveWants {
		live = append(live, c)
		s.liveWants[c] = now
	}

	// Broadcast these keys to everyone we're connected to
319
	s.pm.RecordPeerRequests(nil, live)
320 321
	s.wm.WantBlocks(ctx, live, nil, s.id)

322
	// do no find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
323
	// -- just rely on periodic search widening
324
	if len(live) > 0 && (s.consecutiveTicks == 0) {
325
		s.pm.FindMorePeers(ctx, live[0])
326
	}
Steven Allen's avatar
Steven Allen committed
327
	s.resetIdleTick()
328 329 330 331

	if len(s.liveWants) > 0 {
		s.consecutiveTicks++
	}
332 333
}

Steven Allen's avatar
Steven Allen committed
334
func (s *Session) handlePeriodicSearch(ctx context.Context) {
335 336
	randomWant := s.randomLiveWant()
	if !randomWant.Defined() {
337 338 339 340 341
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
342 343
	s.pm.FindMorePeers(ctx, randomWant)
	s.wm.WantBlocks(ctx, []cid.Cid{randomWant}, nil, s.id)
344

Steven Allen's avatar
Steven Allen committed
345
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
346 347 348
}

func (s *Session) randomLiveWant() cid.Cid {
Steven Allen's avatar
Steven Allen committed
349 350 351
	if len(s.liveWants) == 0 {
		return cid.Cid{}
	}
352 353 354 355 356 357 358 359 360 361
	i := rand.Intn(len(s.liveWants))
	// picking a random live want
	for k := range s.liveWants {
		if i == 0 {
			return k
		}
		i--
	}
	return cid.Cid{}
}
362
func (s *Session) handleShutdown() {
Steven Allen's avatar
Steven Allen committed
363
	s.idleTick.Stop()
364 365 366 367 368 369 370 371

	live := make([]cid.Cid, 0, len(s.liveWants))
	for c := range s.liveWants {
		live = append(live, c)
	}
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

372
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
373
	_, ok := s.liveWants[c]
374 375 376 377 378 379
	if !ok {
		ok = s.tofetch.Has(c)
	}
	return ok
}

380
func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid) {
381
	for _, c := range ks {
382 383 384 385 386 387 388 389 390
		if s.cidIsWanted(c) {
			// If the block CID was in the live wants queue, remove it
			tval, ok := s.liveWants[c]
			if ok {
				s.latTotal += time.Since(tval)
				delete(s.liveWants, c)
			} else {
				// Otherwise remove it from the tofetch queue, if it was there
				s.tofetch.Remove(c)
391
			}
392 393 394 395 396 397 398 399
			s.fetchcnt++

			// We've received new wanted blocks, so reset the number of ticks
			// that have occurred since the last new block
			s.consecutiveTicks = 0

			// Keep track of CIDs we've successfully fetched
			s.pastWants.Push(c)
Jeromy's avatar
Jeromy committed
400
		}
401
	}
402

403 404 405 406 407 408 409 410 411 412 413 414
	// Transfer as many CIDs as possible from the tofetch queue into the
	// live wants queue
	toAdd := s.wantBudget()
	if toAdd > s.tofetch.Len() {
		toAdd = s.tofetch.Len()
	}
	if toAdd > 0 {
		var keys []cid.Cid
		for i := 0; i < toAdd; i++ {
			keys = append(keys, s.tofetch.Pop())
		}
		s.wantBlocks(ctx, keys)
415 416 417
	}
}

418
func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
419
	for _, k := range rcv.ks {
420
		// Inform the request splitter of unique / duplicate blocks
421
		if s.cidIsWanted(k) {
422
			s.srs.RecordUniqueBlock()
423
		} else if s.pastWants.Has(k) {
424
			s.srs.RecordDuplicateBlock()
425
		}
426 427 428
	}

	// Record response (to be able to time latency)
429 430
	if len(rcv.ks) > 0 {
		s.pm.RecordPeerResponse(rcv.from, rcv.ks)
Jeromy's avatar
Jeromy committed
431 432 433
	}
}

434
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
435
	now := time.Now()
Jeromy's avatar
Jeromy committed
436
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
437
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
438
	}
439
	peers := s.pm.GetOptimizedPeers()
440
	if len(peers) > 0 {
441 442 443 444
		splitRequests := s.srs.SplitRequest(peers, ks)
		for _, splitRequest := range splitRequests {
			s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
			s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
445 446 447 448 449
		}
	} else {
		s.pm.RecordPeerRequests(nil, ks)
		s.wm.WantBlocks(ctx, ks, nil, s.id)
	}
Jeromy's avatar
Jeromy committed
450 451
}

452 453
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
454
}
455

Steven Allen's avatar
Steven Allen committed
456
func (s *Session) resetIdleTick() {
457
	var tickDelay time.Duration
458
	if s.latTotal == 0 {
Steven Allen's avatar
Steven Allen committed
459
		tickDelay = s.initialSearchDelay
460 461
	} else {
		avLat := s.averageLatency()
462
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
463
	}
464
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
465
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
466
}
467 468 469 470 471 472 473 474 475 476 477 478 479 480

func (s *Session) wantBudget() int {
	live := len(s.liveWants)
	var budget int
	if len(s.pm.GetOptimizedPeers()) > 0 {
		budget = targetedLiveWantsLimit - live
	} else {
		budget = broadcastLiveWantsLimit - live
	}
	if budget < 0 {
		budget = 0
	}
	return budget
}