session.go 9.35 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4 5 6

import (
	"context"
	"time"

7 8
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
9
	bssd "github.com/ipfs/go-bitswap/sessiondata"
Jeromy's avatar
Jeromy committed
10 11
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
12
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
13
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
14
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
15
	loggables "github.com/libp2p/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
16 17
)

18
const (
19 20
	broadcastLiveWantsLimit = 4
	targetedLiveWantsLimit  = 32
21
)
Jeromy's avatar
Jeromy committed
22

23
// WantManager is an interface that can be used to request blocks
24
// from given peers.
25
type WantManager interface {
26 27 28 29
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

30 31
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
32 33
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
34
	GetOptimizedPeers() []bssd.OptimizedPeer
35
	RecordPeerRequests([]peer.ID, []cid.Cid)
36 37
	RecordPeerResponse(peer.ID, []cid.Cid)
	RecordCancels([]cid.Cid)
38 39
}

40 41 42
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
43
	SplitRequest([]bssd.OptimizedPeer, []cid.Cid) []bssd.PartialRequest
44 45 46 47
	RecordDuplicateBlock()
	RecordUniqueBlock()
}

48
type rcvFrom struct {
49
	from peer.ID
50
	ks   []cid.Cid
51 52
}

Jeromy's avatar
Jeromy committed
53 54
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
55
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
56
type Session struct {
57
	// dependencies
58 59 60
	ctx context.Context
	wm  WantManager
	pm  PeerManager
61
	srs RequestSplitter
62

63 64
	sw sessionWants

65
	// channels
66
	incoming      chan rcvFrom
67 68 69 70 71 72
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
73 74 75 76 77 78 79 80
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	latTotal            time.Duration
	fetchcnt            int
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
81
	// identifiers
Jeromy's avatar
Jeromy committed
82
	notif notifications.PubSub
83 84
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
85 86
}

87
// New creates a new bitswap session whose lifetime is bounded by the
88
// given context.
89 90 91 92 93
func New(ctx context.Context,
	id uint64,
	wm WantManager,
	pm PeerManager,
	srs RequestSplitter,
94
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
95 96
	initialSearchDelay time.Duration,
	periodicSearchDelay delay.D) *Session {
Jeromy's avatar
Jeromy committed
97
	s := &Session{
98 99 100 101 102
		sw: sessionWants{
			toFetch:   newCidQueue(),
			liveWants: make(map[cid.Cid]time.Time),
			pastWants: cid.NewSet(),
		},
Steven Allen's avatar
Steven Allen committed
103 104 105 106 107 108 109 110
		newReqs:             make(chan []cid.Cid),
		cancelKeys:          make(chan []cid.Cid),
		latencyReqs:         make(chan chan time.Duration),
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
		wm:                  wm,
		pm:                  pm,
		srs:                 srs,
111
		incoming:            make(chan rcvFrom),
112
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
113 114 115 116 117
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
Jeromy's avatar
Jeromy committed
118 119 120 121 122 123 124
	}

	go s.run(ctx)

	return s
}

125 126
// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) {
127 128 129 130 131
	interested := s.sw.FilterInteresting(ks)
	if len(interested) == 0 {
		return
	}

132
	select {
133
	case s.incoming <- rcvFrom{from: from, ks: interested}:
134 135 136 137
	case <-s.ctx.Done():
	}
}

138 139
// IsWanted returns true if this session is waiting to receive the given Cid.
func (s *Session) IsWanted(c cid.Cid) bool {
140
	return s.sw.IsWanted(c)
Jeromy's avatar
Jeromy committed
141 142
}

143 144 145 146 147 148 149 150 151 152
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
153 154

	return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
155 156 157 158 159 160 161 162 163 164 165 166 167 168
		func(ctx context.Context, keys []cid.Cid) {
			select {
			case s.newReqs <- keys:
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
			case s.cancelKeys <- keys:
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
169 170
}

171
// GetAverageLatency returns the average latency for block requests.
172 173
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
174
	select {
175 176 177 178 179 180 181 182
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
183
	case <-s.ctx.Done():
184
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
185
	}
Jeromy's avatar
Jeromy committed
186 187
}

188
// SetBaseTickDelay changes the rate at which ticks happen.
189 190 191 192 193
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
194 195
}

196 197
// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
198
func (s *Session) run(ctx context.Context) {
Steven Allen's avatar
Steven Allen committed
199 200
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
201 202
	for {
		select {
203
		case rcv := <-s.incoming:
204
			s.handleIncoming(ctx, rcv)
Jeromy's avatar
Jeromy committed
205
		case keys := <-s.newReqs:
206
			s.wantBlocks(ctx, keys)
Jeromy's avatar
Jeromy committed
207
		case keys := <-s.cancelKeys:
208
			s.sw.CancelPending(keys)
Steven Allen's avatar
Steven Allen committed
209 210 211 212
		case <-s.idleTick.C:
			s.handleIdleTick(ctx)
		case <-s.periodicSearchTimer.C:
			s.handlePeriodicSearch(ctx)
213 214 215 216
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
217
		case <-ctx.Done():
218
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
219 220 221 222 223
			return
		}
	}
}

Steven Allen's avatar
Steven Allen committed
224
func (s *Session) handleIdleTick(ctx context.Context) {
225
	live := s.sw.PrepareBroadcast()
226 227

	// Broadcast these keys to everyone we're connected to
228
	s.pm.RecordPeerRequests(nil, live)
229 230
	s.wm.WantBlocks(ctx, live, nil, s.id)

231
	// do no find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
232
	// -- just rely on periodic search widening
233
	if len(live) > 0 && (s.consecutiveTicks == 0) {
234
		s.pm.FindMorePeers(ctx, live[0])
235
	}
Steven Allen's avatar
Steven Allen committed
236
	s.resetIdleTick()
237

238
	if s.sw.HasLiveWants() {
239 240
		s.consecutiveTicks++
	}
241 242
}

Steven Allen's avatar
Steven Allen committed
243
func (s *Session) handlePeriodicSearch(ctx context.Context) {
244
	randomWant := s.sw.RandomLiveWant()
245
	if !randomWant.Defined() {
246 247 248 249 250
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
251 252
	s.pm.FindMorePeers(ctx, randomWant)
	s.wm.WantBlocks(ctx, []cid.Cid{randomWant}, nil, s.id)
253

Steven Allen's avatar
Steven Allen committed
254
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
255 256
}

257
func (s *Session) handleShutdown() {
Steven Allen's avatar
Steven Allen committed
258
	s.idleTick.Stop()
259

260
	live := s.sw.LiveWants()
261 262 263 264 265 266 267 268
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
	// Record statistics only if the blocks came from the network
	// (blocks can also be received from the local node)
	if rcv.from != "" {
		s.updateReceiveCounters(ctx, rcv)
269
	}
270

271
	// Update the want list
272
	wanted, totalLatency := s.sw.BlocksReceived(rcv.ks)
273 274
	if len(wanted) == 0 {
		return
275
	}
276 277 278 279 280 281 282 283 284 285

	// We've received the blocks so we can cancel any outstanding wants for them
	s.cancelIncoming(ctx, wanted)

	s.idleTick.Stop()

	// Process the received blocks
	s.processIncoming(ctx, wanted, totalLatency)

	s.resetIdleTick()
286 287
}

288
func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
289 290
	// Record unique vs duplicate blocks
	s.sw.ForEachUniqDup(rcv.ks, s.srs.RecordUniqueBlock, s.srs.RecordDuplicateBlock)
291

292
	// Record response (to be able to time latency)
293 294
	if len(rcv.ks) > 0 {
		s.pm.RecordPeerResponse(rcv.from, rcv.ks)
Jeromy's avatar
Jeromy committed
295 296 297
	}
}

298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) {
	s.pm.RecordCancels(ks)
	s.wm.CancelWants(s.ctx, ks, nil, s.id)
}

func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid, totalLatency time.Duration) {
	// Keep track of the total number of blocks received and total latency
	s.fetchcnt += len(ks)
	s.latTotal += totalLatency

	// We've received new wanted blocks, so reset the number of ticks
	// that have occurred since the last new block
	s.consecutiveTicks = 0

	s.wantBlocks(ctx, nil)
}

func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
316 317 318
	// Given the want limit and any newly received blocks, get as many wants as
	// we can to send out
	ks := s.sw.GetNextWants(s.wantLimit(), newks)
319 320
	if len(ks) == 0 {
		return
Jeromy's avatar
Jeromy committed
321
	}
322

323
	peers := s.pm.GetOptimizedPeers()
324
	if len(peers) > 0 {
325 326 327 328
		splitRequests := s.srs.SplitRequest(peers, ks)
		for _, splitRequest := range splitRequests {
			s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
			s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
329 330 331 332 333
		}
	} else {
		s.pm.RecordPeerRequests(nil, ks)
		s.wm.WantBlocks(ctx, ks, nil, s.id)
	}
Jeromy's avatar
Jeromy committed
334 335
}

336 337
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
338
}
339

Steven Allen's avatar
Steven Allen committed
340
func (s *Session) resetIdleTick() {
341
	var tickDelay time.Duration
342
	if s.latTotal == 0 {
Steven Allen's avatar
Steven Allen committed
343
		tickDelay = s.initialSearchDelay
344 345
	} else {
		avLat := s.averageLatency()
346
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
347
	}
348
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
349
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
350
}
351

352
func (s *Session) wantLimit() int {
353
	if len(s.pm.GetOptimizedPeers()) > 0 {
354
		return targetedLiveWantsLimit
355
	}
356
	return broadcastLiveWantsLimit
357
}