session.go 11.1 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4

import (
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6 7
	"time"

Jeromy's avatar
Jeromy committed
8
	lru "github.com/hashicorp/golang-lru"
9 10
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
Jeromy's avatar
Jeromy committed
11 12 13 14 15
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
	loggables "github.com/libp2p/go-libp2p-loggables"
	peer "github.com/libp2p/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
16 17
)

18 19 20 21 22 23 24 25 26
const (
	minReceivedToSplit       = 2
	maxSplit                 = 32
	maxAcceptableDupes       = 0.4
	minDuplesToTryLessSplits = 0.2
	initialSplit             = 2
	broadcastLiveWantsLimit  = 4
	targetedLiveWantsLimit   = 32
)
Jeromy's avatar
Jeromy committed
27

28
// WantManager is an interface that can be used to request blocks
29
// from given peers.
30
type WantManager interface {
31 32 33 34
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

35 36
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
37 38 39 40 41 42 43
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
	GetOptimizedPeers() []peer.ID
	RecordPeerRequests([]peer.ID, []cid.Cid)
	RecordPeerResponse(peer.ID, cid.Cid)
}

44 45 46 47 48 49
type interestReq struct {
	c    cid.Cid
	resp chan bool
}

type blkRecv struct {
50 51 52
	from           peer.ID
	blk            blocks.Block
	counterMessage bool
53 54
}

Jeromy's avatar
Jeromy committed
55 56
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
57
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
58
type Session struct {
59
	// dependencies
60 61 62
	ctx context.Context
	wm  WantManager
	pm  PeerManager
63 64 65 66 67 68 69 70 71 72

	// channels
	incoming      chan blkRecv
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	interestReqs  chan interestReq
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
73 74 75 76 77 78 79 80 81 82 83
	tofetch                *cidQueue
	interest               *lru.Cache
	pastWants              *cidQueue
	liveWants              map[cid.Cid]time.Time
	tick                   *time.Timer
	baseTickDelay          time.Duration
	latTotal               time.Duration
	fetchcnt               int
	receivedCount          int
	split                  int
	duplicateReceivedCount int
84
	// identifiers
Jeromy's avatar
Jeromy committed
85
	notif notifications.PubSub
86 87
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
88 89
}

90
// New creates a new bitswap session whose lifetime is bounded by the
91
// given context.
92
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
Jeromy's avatar
Jeromy committed
93
	s := &Session{
Steven Allen's avatar
Steven Allen committed
94
		liveWants:     make(map[cid.Cid]time.Time),
95 96
		newReqs:       make(chan []cid.Cid),
		cancelKeys:    make(chan []cid.Cid),
97
		tofetch:       newCidQueue(),
98
		pastWants:     newCidQueue(),
Jeromy's avatar
Jeromy committed
99
		interestReqs:  make(chan interestReq),
100 101
		latencyReqs:   make(chan chan time.Duration),
		tickDelayReqs: make(chan time.Duration),
Jeromy's avatar
Jeromy committed
102
		ctx:           ctx,
103
		wm:            wm,
104
		pm:            pm,
105
		split:         initialSplit,
Jeromy's avatar
Jeromy committed
106 107 108 109
		incoming:      make(chan blkRecv),
		notif:         notifications.New(),
		uuid:          loggables.Uuid("GetBlockRequest"),
		baseTickDelay: time.Millisecond * 500,
110
		id:            id,
Jeromy's avatar
Jeromy committed
111 112 113 114 115 116 117 118 119 120
	}

	cache, _ := lru.New(2048)
	s.interest = cache

	go s.run(ctx)

	return s
}

121 122 123
// ReceiveBlockFrom receives an incoming block from the given peer.
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
	select {
124
	case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: false}:
125
	case <-s.ctx.Done():
126
	}
127 128 129
	ks := []cid.Cid{blk.Cid()}
	s.wm.CancelWants(s.ctx, ks, nil, s.id)

130
}
131

132 133 134 135 136 137 138 139 140
// UpdateReceiveCounters updates receive counters for a block,
// which may be a duplicate and adjusts the split factor based on that.
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
	select {
	case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
	case <-s.ctx.Done():
	}
}

141 142
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
	if s.interest.Contains(c) {
		return true
	}
	// TODO: PERF: this is using a channel to guard a map access against race
	// conditions. This is definitely much slower than a mutex, though its unclear
	// if it will actually induce any noticeable slowness. This is implemented this
	// way to avoid adding a more complex set of mutexes around the liveWants map.
	// note that in the average case (where this session *is* interested in the
	// block we received) this function will not be called, as the cid will likely
	// still be in the interest cache.
	resp := make(chan bool, 1)
	select {
	case s.interestReqs <- interestReq{
		c:    c,
		resp: resp,
	}:
	case <-s.ctx.Done():
		return false
	}

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
169 170
}

171 172 173 174 175 176 177 178 179 180
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
	return bsgetter.AsyncGetBlocks(ctx, keys, s.notif,
		func(ctx context.Context, keys []cid.Cid) {
			select {
			case s.newReqs <- keys:
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
			case s.cancelKeys <- keys:
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
196 197
}

198
// GetAverageLatency returns the average latency for block requests.
199 200
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
201
	select {
202 203 204 205 206 207 208 209
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
210
	case <-s.ctx.Done():
211
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
212
	}
Jeromy's avatar
Jeromy committed
213 214
}

215
// SetBaseTickDelay changes the rate at which ticks happen.
216 217 218 219 220
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
221 222
}

223 224 225 226
const provSearchDelay = time.Second * 10

// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
227 228 229 230 231
func (s *Session) run(ctx context.Context) {
	s.tick = time.NewTimer(provSearchDelay)
	for {
		select {
		case blk := <-s.incoming:
232 233 234 235 236
			if blk.counterMessage {
				s.updateReceiveCounters(ctx, blk.blk)
			} else {
				s.handleIncomingBlock(ctx, blk)
			}
Jeromy's avatar
Jeromy committed
237
		case keys := <-s.newReqs:
238
			s.handleNewRequest(ctx, keys)
Jeromy's avatar
Jeromy committed
239
		case keys := <-s.cancelKeys:
240
			s.handleCancel(keys)
Jeromy's avatar
Jeromy committed
241
		case <-s.tick.C:
242
			s.handleTick(ctx)
Jeromy's avatar
Jeromy committed
243
		case lwchk := <-s.interestReqs:
244
			lwchk.resp <- s.cidIsWanted(lwchk.c)
245 246 247 248
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
249
		case <-ctx.Done():
250
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
251 252 253 254 255
			return
		}
	}
}

256 257 258 259
func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
	s.tick.Stop()

	if blk.from != "" {
260
		s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
261 262 263 264 265 266 267 268 269 270 271
	}

	s.receiveBlock(ctx, blk.blk)

	s.resetTick()
}

func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
	for _, k := range keys {
		s.interest.Add(k, nil)
	}
272
	if toadd := s.wantBudget(); toadd > 0 {
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
		if toadd > len(keys) {
			toadd = len(keys)
		}

		now := keys[:toadd]
		keys = keys[toadd:]

		s.wantBlocks(ctx, now)
	}
	for _, k := range keys {
		s.tofetch.Push(k)
	}
}

func (s *Session) handleCancel(keys []cid.Cid) {
	for _, c := range keys {
		s.tofetch.Remove(c)
	}
}

293
func (s *Session) handleTick(ctx context.Context) {
294

295 296 297 298 299 300 301 302
	live := make([]cid.Cid, 0, len(s.liveWants))
	now := time.Now()
	for c := range s.liveWants {
		live = append(live, c)
		s.liveWants[c] = now
	}

	// Broadcast these keys to everyone we're connected to
303
	s.pm.RecordPeerRequests(nil, live)
304 305 306
	s.wm.WantBlocks(ctx, live, nil, s.id)

	if len(live) > 0 {
307
		s.pm.FindMorePeers(ctx, live[0])
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
	}
	s.resetTick()
}

func (s *Session) handleShutdown() {
	s.tick.Stop()
	s.notif.Shutdown()

	live := make([]cid.Cid, 0, len(s.liveWants))
	for c := range s.liveWants {
		live = append(live, c)
	}
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

323
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
324
	_, ok := s.liveWants[c]
325 326 327 328 329 330
	if !ok {
		ok = s.tofetch.Has(c)
	}
	return ok
}

Jeromy's avatar
Jeromy committed
331
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
332 333
	c := blk.Cid()
	if s.cidIsWanted(c) {
Steven Allen's avatar
Steven Allen committed
334
		tval, ok := s.liveWants[c]
335 336
		if ok {
			s.latTotal += time.Since(tval)
Steven Allen's avatar
Steven Allen committed
337
			delete(s.liveWants, c)
338 339 340
		} else {
			s.tofetch.Remove(c)
		}
Jeromy's avatar
Jeromy committed
341 342 343
		s.fetchcnt++
		s.notif.Publish(blk)

344 345 346 347 348 349 350 351 352 353
		toAdd := s.wantBudget()
		if toAdd > s.tofetch.Len() {
			toAdd = s.tofetch.Len()
		}
		if toAdd > 0 {
			var keys []cid.Cid
			for i := 0; i < toAdd; i++ {
				keys = append(keys, s.tofetch.Pop())
			}
			s.wantBlocks(ctx, keys)
Jeromy's avatar
Jeromy committed
354
		}
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376

		s.pastWants.Push(c)
	}
}

func (s *Session) duplicateRatio() float64 {
	return float64(s.duplicateReceivedCount) / float64(s.receivedCount)
}
func (s *Session) updateReceiveCounters(ctx context.Context, blk blocks.Block) {
	if s.pastWants.Has(blk.Cid()) {
		s.receivedCount++
		s.duplicateReceivedCount++
		if (s.receivedCount > minReceivedToSplit) && (s.duplicateRatio() > maxAcceptableDupes) && (s.split < maxSplit) {
			s.split++
		}
	} else {
		if s.cidIsWanted(blk.Cid()) {
			s.receivedCount++
			if (s.split > 1) && (s.duplicateRatio() < minDuplesToTryLessSplits) {
				s.split--
			}
		}
Jeromy's avatar
Jeromy committed
377 378 379
	}
}

380
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
381
	now := time.Now()
Jeromy's avatar
Jeromy committed
382
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
383
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
384
	}
385
	peers := s.pm.GetOptimizedPeers()
386 387 388 389 390 391 392 393 394 395 396 397
	if len(peers) > 0 {
		splitRequests := split(ks, peers, s.split)
		for i, currentKeys := range splitRequests.ks {
			currentPeers := splitRequests.peers[i]
			// right now we're requesting each block from every peer, but soon, maybe not
			s.pm.RecordPeerRequests(currentPeers, currentKeys)
			s.wm.WantBlocks(ctx, currentKeys, currentPeers, s.id)
		}
	} else {
		s.pm.RecordPeerRequests(nil, ks)
		s.wm.WantBlocks(ctx, ks, nil, s.id)
	}
Jeromy's avatar
Jeromy committed
398 399
}

400 401
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
402
}
403

404 405 406 407 408 409
func (s *Session) resetTick() {
	if s.latTotal == 0 {
		s.tick.Reset(provSearchDelay)
	} else {
		avLat := s.averageLatency()
		s.tick.Reset(s.baseTickDelay + (3 * avLat))
Jeromy's avatar
Jeromy committed
410 411
	}
}
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458

type splitRec struct {
	ks    [][]cid.Cid
	peers [][]peer.ID
}

func split(ks []cid.Cid, peers []peer.ID, split int) *splitRec {
	peerSplit := split
	if len(peers) < peerSplit {
		peerSplit = len(peers)
	}
	keySplit := split
	if len(ks) < keySplit {
		keySplit = len(ks)
	}
	if keySplit > peerSplit {
		keySplit = peerSplit
	}
	out := &splitRec{
		ks:    make([][]cid.Cid, keySplit),
		peers: make([][]peer.ID, peerSplit),
	}
	for i, c := range ks {
		pos := i % keySplit
		out.ks[pos] = append(out.ks[pos], c)
	}
	peerOrder := rand.Perm(len(peers))
	for i, po := range peerOrder {
		pos := i % peerSplit
		out.peers[pos] = append(out.peers[pos], peers[po])
	}
	return out
}

func (s *Session) wantBudget() int {
	live := len(s.liveWants)
	var budget int
	if len(s.pm.GetOptimizedPeers()) > 0 {
		budget = targetedLiveWantsLimit - live
	} else {
		budget = broadcastLiveWantsLimit - live
	}
	if budget < 0 {
		budget = 0
	}
	return budget
}