session.go 12.1 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4

import (
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6 7
	"time"

Jeromy's avatar
Jeromy committed
8
	lru "github.com/hashicorp/golang-lru"
9 10
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
11
	bssd "github.com/ipfs/go-bitswap/sessiondata"
Jeromy's avatar
Jeromy committed
12 13
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
14
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
15
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
16
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
17
	loggables "github.com/libp2p/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
18 19
)

20
const (
21 22
	broadcastLiveWantsLimit = 4
	targetedLiveWantsLimit  = 32
23
)
Jeromy's avatar
Jeromy committed
24

25
// WantManager is an interface that can be used to request blocks
26
// from given peers.
27
type WantManager interface {
28 29 30 31
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

32 33
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
34 35
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
36
	GetOptimizedPeers() []bssd.OptimizedPeer
37
	RecordPeerRequests([]peer.ID, []cid.Cid)
38 39
	RecordPeerResponse(peer.ID, []cid.Cid)
	RecordCancels([]cid.Cid)
40 41
}

42 43 44
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
45
	SplitRequest([]bssd.OptimizedPeer, []cid.Cid) []bssd.PartialRequest
46 47 48 49
	RecordDuplicateBlock()
	RecordUniqueBlock()
}

50 51 52 53 54
type interestReq struct {
	c    cid.Cid
	resp chan bool
}

55
type rcvFrom struct {
56
	from peer.ID
57
	ks   []cid.Cid
58 59
}

Jeromy's avatar
Jeromy committed
60 61
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
62
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
63
type Session struct {
64
	// dependencies
65 66 67
	ctx context.Context
	wm  WantManager
	pm  PeerManager
68
	srs RequestSplitter
69 70

	// channels
71
	incoming      chan rcvFrom
72 73 74 75 76 77 78
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	interestReqs  chan interestReq
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
79 80 81 82 83 84 85 86 87 88 89 90
	tofetch             *cidQueue
	interest            *lru.Cache
	pastWants           *cidQueue
	liveWants           map[cid.Cid]time.Time
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	latTotal            time.Duration
	fetchcnt            int
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
91
	// identifiers
Jeromy's avatar
Jeromy committed
92
	notif notifications.PubSub
93 94
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
95 96
}

97
// New creates a new bitswap session whose lifetime is bounded by the
98
// given context.
99 100 101 102 103
func New(ctx context.Context,
	id uint64,
	wm WantManager,
	pm PeerManager,
	srs RequestSplitter,
104
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
105 106
	initialSearchDelay time.Duration,
	periodicSearchDelay delay.D) *Session {
Jeromy's avatar
Jeromy committed
107
	s := &Session{
Steven Allen's avatar
Steven Allen committed
108 109 110 111 112 113 114 115 116 117 118 119
		liveWants:           make(map[cid.Cid]time.Time),
		newReqs:             make(chan []cid.Cid),
		cancelKeys:          make(chan []cid.Cid),
		tofetch:             newCidQueue(),
		pastWants:           newCidQueue(),
		interestReqs:        make(chan interestReq),
		latencyReqs:         make(chan chan time.Duration),
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
		wm:                  wm,
		pm:                  pm,
		srs:                 srs,
120
		incoming:            make(chan rcvFrom),
121
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
122 123 124 125 126
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
Jeromy's avatar
Jeromy committed
127 128 129 130 131 132 133 134 135 136
	}

	cache, _ := lru.New(2048)
	s.interest = cache

	go s.run(ctx)

	return s
}

137 138
// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) {
139
	select {
140
	case s.incoming <- rcvFrom{from: from, ks: ks}:
141 142 143 144
	case <-s.ctx.Done():
	}
}

145 146
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
	if s.interest.Contains(c) {
		return true
	}
	// TODO: PERF: this is using a channel to guard a map access against race
	// conditions. This is definitely much slower than a mutex, though its unclear
	// if it will actually induce any noticeable slowness. This is implemented this
	// way to avoid adding a more complex set of mutexes around the liveWants map.
	// note that in the average case (where this session *is* interested in the
	// block we received) this function will not be called, as the cid will likely
	// still be in the interest cache.
	resp := make(chan bool, 1)
	select {
	case s.interestReqs <- interestReq{
		c:    c,
		resp: resp,
	}:
	case <-s.ctx.Done():
		return false
	}

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
173 174
}

175 176 177 178 179 180 181 182 183 184
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
	return bsgetter.AsyncGetBlocks(ctx, keys, s.notif,
		func(ctx context.Context, keys []cid.Cid) {
			select {
			case s.newReqs <- keys:
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
			case s.cancelKeys <- keys:
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
200 201
}

202
// GetAverageLatency returns the average latency for block requests.
203 204
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
205
	select {
206 207 208 209 210 211 212 213
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
214
	case <-s.ctx.Done():
215
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
216
	}
Jeromy's avatar
Jeromy committed
217 218
}

219
// SetBaseTickDelay changes the rate at which ticks happen.
220 221 222 223 224
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
225 226
}

227 228
// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
229
func (s *Session) run(ctx context.Context) {
Steven Allen's avatar
Steven Allen committed
230 231
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
232 233
	for {
		select {
234
		case rcv := <-s.incoming:
235
			s.cancelIncoming(ctx, rcv)
236 237 238 239
			// Record statistics only if the blocks came from the network
			// (blocks can also be received from the local node)
			if rcv.from != "" {
				s.updateReceiveCounters(ctx, rcv)
240
			}
241
			s.handleIncoming(ctx, rcv)
Jeromy's avatar
Jeromy committed
242
		case keys := <-s.newReqs:
243
			s.handleNewRequest(ctx, keys)
Jeromy's avatar
Jeromy committed
244
		case keys := <-s.cancelKeys:
245
			s.handleCancel(keys)
Steven Allen's avatar
Steven Allen committed
246 247 248 249
		case <-s.idleTick.C:
			s.handleIdleTick(ctx)
		case <-s.periodicSearchTimer.C:
			s.handlePeriodicSearch(ctx)
Jeromy's avatar
Jeromy committed
250
		case lwchk := <-s.interestReqs:
251
			lwchk.resp <- s.cidIsWanted(lwchk.c)
252 253 254 255
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
256
		case <-ctx.Done():
257
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
258 259 260 261 262
			return
		}
	}
}

263
func (s *Session) cancelIncoming(ctx context.Context, rcv rcvFrom) {
264
	// We've received the blocks so we can cancel any outstanding wants for them
265 266 267 268
	wanted := make([]cid.Cid, 0, len(rcv.ks))
	for _, k := range rcv.ks {
		if s.cidIsWanted(k) {
			wanted = append(wanted, k)
269
		}
270
	}
271 272
	s.pm.RecordCancels(wanted)
	s.wm.CancelWants(s.ctx, wanted, nil, s.id)
273 274
}

275
func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
276
	s.idleTick.Stop()
277

278
	// Process the received blocks
279
	s.processIncoming(ctx, rcv.ks)
280

Steven Allen's avatar
Steven Allen committed
281
	s.resetIdleTick()
282 283 284 285 286 287
}

func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
	for _, k := range keys {
		s.interest.Add(k, nil)
	}
288
	if toadd := s.wantBudget(); toadd > 0 {
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
		if toadd > len(keys) {
			toadd = len(keys)
		}

		now := keys[:toadd]
		keys = keys[toadd:]

		s.wantBlocks(ctx, now)
	}
	for _, k := range keys {
		s.tofetch.Push(k)
	}
}

func (s *Session) handleCancel(keys []cid.Cid) {
	for _, c := range keys {
		s.tofetch.Remove(c)
	}
}

Steven Allen's avatar
Steven Allen committed
309
func (s *Session) handleIdleTick(ctx context.Context) {
310 311 312 313 314 315 316 317
	live := make([]cid.Cid, 0, len(s.liveWants))
	now := time.Now()
	for c := range s.liveWants {
		live = append(live, c)
		s.liveWants[c] = now
	}

	// Broadcast these keys to everyone we're connected to
318
	s.pm.RecordPeerRequests(nil, live)
319 320
	s.wm.WantBlocks(ctx, live, nil, s.id)

321
	// do no find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
322
	// -- just rely on periodic search widening
323
	if len(live) > 0 && (s.consecutiveTicks == 0) {
324
		s.pm.FindMorePeers(ctx, live[0])
325
	}
Steven Allen's avatar
Steven Allen committed
326
	s.resetIdleTick()
327 328 329 330

	if len(s.liveWants) > 0 {
		s.consecutiveTicks++
	}
331 332
}

Steven Allen's avatar
Steven Allen committed
333
func (s *Session) handlePeriodicSearch(ctx context.Context) {
334 335
	randomWant := s.randomLiveWant()
	if !randomWant.Defined() {
336 337 338 339 340
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
341 342
	s.pm.FindMorePeers(ctx, randomWant)
	s.wm.WantBlocks(ctx, []cid.Cid{randomWant}, nil, s.id)
343

Steven Allen's avatar
Steven Allen committed
344
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
345 346 347
}

func (s *Session) randomLiveWant() cid.Cid {
Steven Allen's avatar
Steven Allen committed
348 349 350
	if len(s.liveWants) == 0 {
		return cid.Cid{}
	}
351 352 353 354 355 356 357 358 359 360
	i := rand.Intn(len(s.liveWants))
	// picking a random live want
	for k := range s.liveWants {
		if i == 0 {
			return k
		}
		i--
	}
	return cid.Cid{}
}
361
func (s *Session) handleShutdown() {
Steven Allen's avatar
Steven Allen committed
362
	s.idleTick.Stop()
363 364 365 366 367 368 369 370

	live := make([]cid.Cid, 0, len(s.liveWants))
	for c := range s.liveWants {
		live = append(live, c)
	}
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

371
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
372
	_, ok := s.liveWants[c]
373 374 375 376 377 378
	if !ok {
		ok = s.tofetch.Has(c)
	}
	return ok
}

379
func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid) {
380
	for _, c := range ks {
381 382 383 384 385 386 387 388 389
		if s.cidIsWanted(c) {
			// If the block CID was in the live wants queue, remove it
			tval, ok := s.liveWants[c]
			if ok {
				s.latTotal += time.Since(tval)
				delete(s.liveWants, c)
			} else {
				// Otherwise remove it from the tofetch queue, if it was there
				s.tofetch.Remove(c)
390
			}
391 392 393 394 395 396 397 398
			s.fetchcnt++

			// We've received new wanted blocks, so reset the number of ticks
			// that have occurred since the last new block
			s.consecutiveTicks = 0

			// Keep track of CIDs we've successfully fetched
			s.pastWants.Push(c)
Jeromy's avatar
Jeromy committed
399
		}
400
	}
401

402 403 404 405 406 407 408 409 410 411 412 413
	// Transfer as many CIDs as possible from the tofetch queue into the
	// live wants queue
	toAdd := s.wantBudget()
	if toAdd > s.tofetch.Len() {
		toAdd = s.tofetch.Len()
	}
	if toAdd > 0 {
		var keys []cid.Cid
		for i := 0; i < toAdd; i++ {
			keys = append(keys, s.tofetch.Pop())
		}
		s.wantBlocks(ctx, keys)
414 415 416
	}
}

417
func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
418
	for _, k := range rcv.ks {
419
		// Inform the request splitter of unique / duplicate blocks
420
		if s.cidIsWanted(k) {
421
			s.srs.RecordUniqueBlock()
422
		} else if s.pastWants.Has(k) {
423
			s.srs.RecordDuplicateBlock()
424
		}
425 426 427
	}

	// Record response (to be able to time latency)
428 429
	if len(rcv.ks) > 0 {
		s.pm.RecordPeerResponse(rcv.from, rcv.ks)
Jeromy's avatar
Jeromy committed
430 431 432
	}
}

433
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
434
	now := time.Now()
Jeromy's avatar
Jeromy committed
435
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
436
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
437
	}
438
	peers := s.pm.GetOptimizedPeers()
439
	if len(peers) > 0 {
440 441 442 443
		splitRequests := s.srs.SplitRequest(peers, ks)
		for _, splitRequest := range splitRequests {
			s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
			s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
444 445 446 447 448
		}
	} else {
		s.pm.RecordPeerRequests(nil, ks)
		s.wm.WantBlocks(ctx, ks, nil, s.id)
	}
Jeromy's avatar
Jeromy committed
449 450
}

451 452
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
453
}
454

Steven Allen's avatar
Steven Allen committed
455
func (s *Session) resetIdleTick() {
456
	var tickDelay time.Duration
457
	if s.latTotal == 0 {
Steven Allen's avatar
Steven Allen committed
458
		tickDelay = s.initialSearchDelay
459 460
	} else {
		avLat := s.averageLatency()
461
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
462
	}
463
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
464
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
465
}
466 467 468 469 470 471 472 473 474 475 476 477 478 479

func (s *Session) wantBudget() int {
	live := len(s.liveWants)
	var budget int
	if len(s.pm.GetOptimizedPeers()) > 0 {
		budget = targetedLiveWantsLimit - live
	} else {
		budget = broadcastLiveWantsLimit - live
	}
	if budget < 0 {
		budget = 0
	}
	return budget
}