session.go 11.7 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4

import (
	"context"
5
	"math/rand"
6
	"sync"
Jeromy's avatar
Jeromy committed
7 8
	"time"

9 10
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
11
	bssd "github.com/ipfs/go-bitswap/sessiondata"
Jeromy's avatar
Jeromy committed
12 13
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
14
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
15
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
16
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
17
	loggables "github.com/libp2p/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
18 19
)

20
const (
21 22
	broadcastLiveWantsLimit = 4
	targetedLiveWantsLimit  = 32
23
)
Jeromy's avatar
Jeromy committed
24

25
// WantManager is an interface that can be used to request blocks
26
// from given peers.
27
type WantManager interface {
28 29 30 31
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

32 33
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
34 35
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
36
	GetOptimizedPeers() []bssd.OptimizedPeer
37
	RecordPeerRequests([]peer.ID, []cid.Cid)
38 39
	RecordPeerResponse(peer.ID, []cid.Cid)
	RecordCancels([]cid.Cid)
40 41
}

42 43 44
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
45
	SplitRequest([]bssd.OptimizedPeer, []cid.Cid) []bssd.PartialRequest
46 47 48 49
	RecordDuplicateBlock()
	RecordUniqueBlock()
}

50
type rcvFrom struct {
51
	from peer.ID
52
	ks   []cid.Cid
53 54
}

55 56 57 58 59 60 61
type sessionWants struct {
	sync.RWMutex
	toFetch   *cidQueue
	liveWants map[cid.Cid]time.Time
	pastWants *cid.Set
}

Jeromy's avatar
Jeromy committed
62 63
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
64
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
65
type Session struct {
66
	// dependencies
67 68 69
	ctx context.Context
	wm  WantManager
	pm  PeerManager
70
	srs RequestSplitter
71

72 73
	sw sessionWants

74
	// channels
75
	incoming      chan rcvFrom
76 77 78 79 80 81
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
82 83 84 85 86 87 88 89
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	latTotal            time.Duration
	fetchcnt            int
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
90
	// identifiers
Jeromy's avatar
Jeromy committed
91
	notif notifications.PubSub
92 93
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
94 95
}

96
// New creates a new bitswap session whose lifetime is bounded by the
97
// given context.
98 99 100 101 102
func New(ctx context.Context,
	id uint64,
	wm WantManager,
	pm PeerManager,
	srs RequestSplitter,
103
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
104 105
	initialSearchDelay time.Duration,
	periodicSearchDelay delay.D) *Session {
Jeromy's avatar
Jeromy committed
106
	s := &Session{
107 108 109 110 111
		sw: sessionWants{
			toFetch:   newCidQueue(),
			liveWants: make(map[cid.Cid]time.Time),
			pastWants: cid.NewSet(),
		},
Steven Allen's avatar
Steven Allen committed
112 113 114 115 116 117 118 119
		newReqs:             make(chan []cid.Cid),
		cancelKeys:          make(chan []cid.Cid),
		latencyReqs:         make(chan chan time.Duration),
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
		wm:                  wm,
		pm:                  pm,
		srs:                 srs,
120
		incoming:            make(chan rcvFrom),
121
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
122 123 124 125 126
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
Jeromy's avatar
Jeromy committed
127 128 129 130 131 132 133
	}

	go s.run(ctx)

	return s
}

134 135
// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) {
136
	select {
137
	case s.incoming <- rcvFrom{from: from, ks: ks}:
138 139 140 141
	case <-s.ctx.Done():
	}
}

142 143 144 145 146 147 148 149 150
// IsWanted returns true if this session is waiting to receive the given Cid.
func (s *Session) IsWanted(c cid.Cid) bool {
	s.sw.RLock()
	defer s.sw.RUnlock()

	return s.unlockedIsWanted(c)
}

// InterestedIn returns true if this session has ever requested the given Cid.
151
func (s *Session) InterestedIn(c cid.Cid) bool {
152 153
	s.sw.RLock()
	defer s.sw.RUnlock()
154

155
	return s.unlockedIsWanted(c) || s.sw.pastWants.Has(c)
Jeromy's avatar
Jeromy committed
156 157
}

158 159 160 161 162 163 164 165 166 167
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
168 169

	return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
170 171 172 173 174 175 176 177 178 179 180 181 182 183
		func(ctx context.Context, keys []cid.Cid) {
			select {
			case s.newReqs <- keys:
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
			case s.cancelKeys <- keys:
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
184 185
}

186
// GetAverageLatency returns the average latency for block requests.
187 188
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
189
	select {
190 191 192 193 194 195 196 197
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
198
	case <-s.ctx.Done():
199
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
200
	}
Jeromy's avatar
Jeromy committed
201 202
}

203
// SetBaseTickDelay changes the rate at which ticks happen.
204 205 206 207 208
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
209 210
}

211 212
// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
213
func (s *Session) run(ctx context.Context) {
Steven Allen's avatar
Steven Allen committed
214 215
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
216 217
	for {
		select {
218
		case rcv := <-s.incoming:
219
			s.handleIncoming(ctx, rcv)
Jeromy's avatar
Jeromy committed
220
		case keys := <-s.newReqs:
221
			s.wantBlocks(ctx, keys)
Jeromy's avatar
Jeromy committed
222
		case keys := <-s.cancelKeys:
223
			s.handleCancel(keys)
Steven Allen's avatar
Steven Allen committed
224 225 226 227
		case <-s.idleTick.C:
			s.handleIdleTick(ctx)
		case <-s.periodicSearchTimer.C:
			s.handlePeriodicSearch(ctx)
228 229 230 231
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
232
		case <-ctx.Done():
233
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
234 235 236 237 238
			return
		}
	}
}

239 240 241
func (s *Session) handleCancel(keys []cid.Cid) {
	s.sw.Lock()
	defer s.sw.Unlock()
242 243

	for _, k := range keys {
244
		s.sw.toFetch.Remove(k)
245 246 247
	}
}

Steven Allen's avatar
Steven Allen committed
248
func (s *Session) handleIdleTick(ctx context.Context) {
249
	live := s.prepareBroadcast()
250 251

	// Broadcast these keys to everyone we're connected to
252
	s.pm.RecordPeerRequests(nil, live)
253 254
	s.wm.WantBlocks(ctx, live, nil, s.id)

255
	// do no find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
256
	// -- just rely on periodic search widening
257
	if len(live) > 0 && (s.consecutiveTicks == 0) {
258
		s.pm.FindMorePeers(ctx, live[0])
259
	}
Steven Allen's avatar
Steven Allen committed
260
	s.resetIdleTick()
261

262 263 264 265
	s.sw.RLock()
	defer s.sw.RUnlock()

	if len(s.sw.liveWants) > 0 {
266 267
		s.consecutiveTicks++
	}
268 269
}

270 271 272 273 274 275 276 277 278 279 280 281 282
func (s *Session) prepareBroadcast() []cid.Cid {
	s.sw.Lock()
	defer s.sw.Unlock()

	live := make([]cid.Cid, 0, len(s.sw.liveWants))
	now := time.Now()
	for c := range s.sw.liveWants {
		live = append(live, c)
		s.sw.liveWants[c] = now
	}
	return live
}

Steven Allen's avatar
Steven Allen committed
283
func (s *Session) handlePeriodicSearch(ctx context.Context) {
284 285
	randomWant := s.randomLiveWant()
	if !randomWant.Defined() {
286 287 288 289 290
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
291 292
	s.pm.FindMorePeers(ctx, randomWant)
	s.wm.WantBlocks(ctx, []cid.Cid{randomWant}, nil, s.id)
293

Steven Allen's avatar
Steven Allen committed
294
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
295 296 297
}

func (s *Session) randomLiveWant() cid.Cid {
298 299 300 301
	s.sw.RLock()
	defer s.sw.RUnlock()

	if len(s.sw.liveWants) == 0 {
Steven Allen's avatar
Steven Allen committed
302 303
		return cid.Cid{}
	}
304
	i := rand.Intn(len(s.sw.liveWants))
305
	// picking a random live want
306
	for k := range s.sw.liveWants {
307 308 309 310 311 312 313
		if i == 0 {
			return k
		}
		i--
	}
	return cid.Cid{}
}
314

315
func (s *Session) handleShutdown() {
Steven Allen's avatar
Steven Allen committed
316
	s.idleTick.Stop()
317

318 319 320 321 322 323 324 325 326 327
	live := s.liveWants()
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

func (s *Session) liveWants() []cid.Cid {
	s.sw.RLock()
	defer s.sw.RUnlock()

	live := make([]cid.Cid, 0, len(s.sw.liveWants))
	for c := range s.sw.liveWants {
328 329
		live = append(live, c)
	}
330
	return live
331 332
}

333 334
func (s *Session) unlockedIsWanted(c cid.Cid) bool {
	_, ok := s.sw.liveWants[c]
335
	if !ok {
336
		ok = s.sw.toFetch.Has(c)
337 338 339 340
	}
	return ok
}

341 342 343 344 345
func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
	// Record statistics only if the blocks came from the network
	// (blocks can also be received from the local node)
	if rcv.from != "" {
		s.updateReceiveCounters(ctx, rcv)
346
	}
347

348 349 350 351
	// Update the want list
	wanted, totalLatency := s.blocksReceived(rcv.ks)
	if len(wanted) == 0 {
		return
352
	}
353 354 355 356 357 358 359 360 361 362

	// We've received the blocks so we can cancel any outstanding wants for them
	s.cancelIncoming(ctx, wanted)

	s.idleTick.Stop()

	// Process the received blocks
	s.processIncoming(ctx, wanted, totalLatency)

	s.resetIdleTick()
363 364
}

365
func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
366 367 368 369
	s.sw.RLock()

	for _, c := range rcv.ks {
		if s.unlockedIsWanted(c) {
370
			s.srs.RecordUniqueBlock()
371
		} else if s.sw.pastWants.Has(c) {
372
			s.srs.RecordDuplicateBlock()
373
		}
374 375
	}

376 377
	s.sw.RUnlock()

378
	// Record response (to be able to time latency)
379 380
	if len(rcv.ks) > 0 {
		s.pm.RecordPeerResponse(rcv.from, rcv.ks)
Jeromy's avatar
Jeromy committed
381 382 383
	}
}

384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
func (s *Session) blocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration) {
	s.sw.Lock()
	defer s.sw.Unlock()

	totalLatency := time.Duration(0)
	wanted := make([]cid.Cid, 0, len(cids))
	for _, c := range cids {
		if s.unlockedIsWanted(c) {
			wanted = append(wanted, c)

			// If the block CID was in the live wants queue, remove it
			tval, ok := s.sw.liveWants[c]
			if ok {
				totalLatency += time.Since(tval)
				delete(s.sw.liveWants, c)
			} else {
				// Otherwise remove it from the toFetch queue, if it was there
				s.sw.toFetch.Remove(c)
			}

			// Keep track of CIDs we've successfully fetched
			s.sw.pastWants.Add(c)
		}
	}

	return wanted, totalLatency
}

func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) {
	s.pm.RecordCancels(ks)
	s.wm.CancelWants(s.ctx, ks, nil, s.id)
}

func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid, totalLatency time.Duration) {
	// Keep track of the total number of blocks received and total latency
	s.fetchcnt += len(ks)
	s.latTotal += totalLatency

	// We've received new wanted blocks, so reset the number of ticks
	// that have occurred since the last new block
	s.consecutiveTicks = 0

	s.wantBlocks(ctx, nil)
}

func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
	ks := s.getNextWants(s.wantLimit(), newks)
	if len(ks) == 0 {
		return
Jeromy's avatar
Jeromy committed
433
	}
434

435
	peers := s.pm.GetOptimizedPeers()
436
	if len(peers) > 0 {
437 438 439 440
		splitRequests := s.srs.SplitRequest(peers, ks)
		for _, splitRequest := range splitRequests {
			s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
			s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
441 442 443 444 445
		}
	} else {
		s.pm.RecordPeerRequests(nil, ks)
		s.wm.WantBlocks(ctx, ks, nil, s.id)
	}
Jeromy's avatar
Jeromy committed
446 447
}

448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470
func (s *Session) getNextWants(limit int, newWants []cid.Cid) []cid.Cid {
	s.sw.Lock()
	defer s.sw.Unlock()

	now := time.Now()

	for _, k := range newWants {
		s.sw.toFetch.Push(k)
	}

	currentLiveCount := len(s.sw.liveWants)
	toAdd := limit - currentLiveCount

	var live []cid.Cid
	for ; toAdd > 0 && s.sw.toFetch.Len() > 0; toAdd-- {
		c := s.sw.toFetch.Pop()
		live = append(live, c)
		s.sw.liveWants[c] = now
	}

	return live
}

471 472
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
473
}
474

Steven Allen's avatar
Steven Allen committed
475
func (s *Session) resetIdleTick() {
476
	var tickDelay time.Duration
477
	if s.latTotal == 0 {
Steven Allen's avatar
Steven Allen committed
478
		tickDelay = s.initialSearchDelay
479 480
	} else {
		avLat := s.averageLatency()
481
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
482
	}
483
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
484
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
485
}
486

487
func (s *Session) wantLimit() int {
488
	if len(s.pm.GetOptimizedPeers()) > 0 {
489
		return targetedLiveWantsLimit
490
	}
491
	return broadcastLiveWantsLimit
492
}