session.go 9.44 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4 5 6

import (
	"context"
	"time"

7 8
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
9
	bssd "github.com/ipfs/go-bitswap/sessiondata"
Jeromy's avatar
Jeromy committed
10 11
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
12
	delay "github.com/ipfs/go-ipfs-delay"
Jeromy's avatar
Jeromy committed
13
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
14
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
15
	loggables "github.com/libp2p/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
16 17
)

18
const (
19 20
	broadcastLiveWantsLimit = 4
	targetedLiveWantsLimit  = 32
21
)
Jeromy's avatar
Jeromy committed
22

23
// WantManager is an interface that can be used to request blocks
24
// from given peers.
25
type WantManager interface {
26 27 28 29
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

30 31
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
32 33
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
34
	GetOptimizedPeers() []bssd.OptimizedPeer
35
	RecordPeerRequests([]peer.ID, []cid.Cid)
36 37
	RecordPeerResponse(peer.ID, []cid.Cid)
	RecordCancels([]cid.Cid)
38 39
}

40 41 42
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
43
	SplitRequest([]bssd.OptimizedPeer, []cid.Cid) []bssd.PartialRequest
44 45 46 47
	RecordDuplicateBlock()
	RecordUniqueBlock()
}

48 49 50 51 52 53 54 55 56 57
type opType int

const (
	opReceive opType = iota
	opWant
	opCancel
)

type op struct {
	op   opType
58
	from peer.ID
59
	keys []cid.Cid
60 61
}

Jeromy's avatar
Jeromy committed
62 63
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
64
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
65
type Session struct {
66
	// dependencies
67 68 69
	ctx context.Context
	wm  WantManager
	pm  PeerManager
70
	srs RequestSplitter
71

72 73
	sw sessionWants

74
	// channels
75
	incoming      chan op
76 77 78 79
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
Steven Allen's avatar
Steven Allen committed
80 81 82 83 84 85 86 87
	idleTick            *time.Timer
	periodicSearchTimer *time.Timer
	baseTickDelay       time.Duration
	latTotal            time.Duration
	fetchcnt            int
	consecutiveTicks    int
	initialSearchDelay  time.Duration
	periodicSearchDelay delay.D
88
	// identifiers
Jeromy's avatar
Jeromy committed
89
	notif notifications.PubSub
90 91
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
92 93
}

94
// New creates a new bitswap session whose lifetime is bounded by the
95
// given context.
96 97 98 99 100
func New(ctx context.Context,
	id uint64,
	wm WantManager,
	pm PeerManager,
	srs RequestSplitter,
101
	notif notifications.PubSub,
Steven Allen's avatar
Steven Allen committed
102 103
	initialSearchDelay time.Duration,
	periodicSearchDelay delay.D) *Session {
Jeromy's avatar
Jeromy committed
104
	s := &Session{
105 106 107 108 109
		sw: sessionWants{
			toFetch:   newCidQueue(),
			liveWants: make(map[cid.Cid]time.Time),
			pastWants: cid.NewSet(),
		},
Steven Allen's avatar
Steven Allen committed
110 111 112 113 114 115
		latencyReqs:         make(chan chan time.Duration),
		tickDelayReqs:       make(chan time.Duration),
		ctx:                 ctx,
		wm:                  wm,
		pm:                  pm,
		srs:                 srs,
116
		incoming:            make(chan op, 16),
117
		notif:               notif,
Steven Allen's avatar
Steven Allen committed
118 119 120 121 122
		uuid:                loggables.Uuid("GetBlockRequest"),
		baseTickDelay:       time.Millisecond * 500,
		id:                  id,
		initialSearchDelay:  initialSearchDelay,
		periodicSearchDelay: periodicSearchDelay,
Jeromy's avatar
Jeromy committed
123 124 125 126 127 128 129
	}

	go s.run(ctx)

	return s
}

130 131
// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) {
132 133 134 135 136
	interested := s.sw.FilterInteresting(ks)
	if len(interested) == 0 {
		return
	}

137
	select {
138
	case s.incoming <- op{op: opReceive, from: from, keys: interested}:
139 140 141 142
	case <-s.ctx.Done():
	}
}

143 144
// IsWanted returns true if this session is waiting to receive the given Cid.
func (s *Session) IsWanted(c cid.Cid) bool {
145
	return s.sw.IsWanted(c)
Jeromy's avatar
Jeromy committed
146 147
}

148 149 150 151 152 153 154 155 156 157
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
158 159

	return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
160 161
		func(ctx context.Context, keys []cid.Cid) {
			select {
162
			case s.incoming <- op{op: opWant, keys: keys}:
163 164 165 166 167 168
			case <-ctx.Done():
			case <-s.ctx.Done():
			}
		},
		func(keys []cid.Cid) {
			select {
169
			case s.incoming <- op{op: opCancel, keys: keys}:
170 171 172 173
			case <-s.ctx.Done():
			}
		},
	)
Jeromy's avatar
Jeromy committed
174 175
}

176
// GetAverageLatency returns the average latency for block requests.
177 178
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
179
	select {
180 181 182 183 184 185 186 187
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
188
	case <-s.ctx.Done():
189
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
190
	}
Jeromy's avatar
Jeromy committed
191 192
}

193
// SetBaseTickDelay changes the rate at which ticks happen.
194 195 196 197 198
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
199 200
}

201 202
// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
203
func (s *Session) run(ctx context.Context) {
Steven Allen's avatar
Steven Allen committed
204 205
	s.idleTick = time.NewTimer(s.initialSearchDelay)
	s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
Jeromy's avatar
Jeromy committed
206 207
	for {
		select {
208 209 210 211 212 213 214 215 216 217 218
		case oper := <-s.incoming:
			switch oper.op {
			case opReceive:
				s.handleReceive(ctx, oper.from, oper.keys)
			case opWant:
				s.wantBlocks(ctx, oper.keys)
			case opCancel:
				s.sw.CancelPending(oper.keys)
			default:
				panic("unhandled operation")
			}
Steven Allen's avatar
Steven Allen committed
219 220 221 222
		case <-s.idleTick.C:
			s.handleIdleTick(ctx)
		case <-s.periodicSearchTimer.C:
			s.handlePeriodicSearch(ctx)
223 224 225 226
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
227
		case <-ctx.Done():
228
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
229 230 231 232 233
			return
		}
	}
}

Steven Allen's avatar
Steven Allen committed
234
func (s *Session) handleIdleTick(ctx context.Context) {
235
	live := s.sw.PrepareBroadcast()
236 237

	// Broadcast these keys to everyone we're connected to
238
	s.pm.RecordPeerRequests(nil, live)
239 240
	s.wm.WantBlocks(ctx, live, nil, s.id)

241
	// do no find providers on consecutive ticks
Steven Allen's avatar
Steven Allen committed
242
	// -- just rely on periodic search widening
243
	if len(live) > 0 && (s.consecutiveTicks == 0) {
244
		s.pm.FindMorePeers(ctx, live[0])
245
	}
Steven Allen's avatar
Steven Allen committed
246
	s.resetIdleTick()
247

248
	if s.sw.HasLiveWants() {
249 250
		s.consecutiveTicks++
	}
251 252
}

Steven Allen's avatar
Steven Allen committed
253
func (s *Session) handlePeriodicSearch(ctx context.Context) {
254
	randomWant := s.sw.RandomLiveWant()
255
	if !randomWant.Defined() {
256 257 258 259 260
		return
	}

	// TODO: come up with a better strategy for determining when to search
	// for new providers for blocks.
261 262
	s.pm.FindMorePeers(ctx, randomWant)
	s.wm.WantBlocks(ctx, []cid.Cid{randomWant}, nil, s.id)
263

Steven Allen's avatar
Steven Allen committed
264
	s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
265 266
}

267
func (s *Session) handleShutdown() {
Steven Allen's avatar
Steven Allen committed
268
	s.idleTick.Stop()
269

270
	live := s.sw.LiveWants()
271 272 273
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

274
func (s *Session) handleReceive(ctx context.Context, from peer.ID, keys []cid.Cid) {
275 276
	// Record statistics only if the blocks came from the network
	// (blocks can also be received from the local node)
277 278
	if from != "" {
		s.updateReceiveCounters(ctx, from, keys)
279
	}
280

281
	// Update the want list
282
	wanted, totalLatency := s.sw.BlocksReceived(keys)
283 284
	if len(wanted) == 0 {
		return
285
	}
286 287 288 289 290 291 292

	// We've received the blocks so we can cancel any outstanding wants for them
	s.cancelIncoming(ctx, wanted)

	s.idleTick.Stop()

	// Process the received blocks
293
	s.processReceive(ctx, wanted, totalLatency)
294 295

	s.resetIdleTick()
296 297
}

298
func (s *Session) updateReceiveCounters(ctx context.Context, from peer.ID, keys []cid.Cid) {
299
	// Record unique vs duplicate blocks
300
	s.sw.ForEachUniqDup(keys, s.srs.RecordUniqueBlock, s.srs.RecordDuplicateBlock)
301

302
	// Record response (to be able to time latency)
303 304
	if len(keys) > 0 {
		s.pm.RecordPeerResponse(from, keys)
Jeromy's avatar
Jeromy committed
305 306 307
	}
}

308 309 310 311 312
func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) {
	s.pm.RecordCancels(ks)
	s.wm.CancelWants(s.ctx, ks, nil, s.id)
}

313
func (s *Session) processReceive(ctx context.Context, ks []cid.Cid, totalLatency time.Duration) {
314 315 316 317 318 319 320 321 322 323 324 325
	// Keep track of the total number of blocks received and total latency
	s.fetchcnt += len(ks)
	s.latTotal += totalLatency

	// We've received new wanted blocks, so reset the number of ticks
	// that have occurred since the last new block
	s.consecutiveTicks = 0

	s.wantBlocks(ctx, nil)
}

func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
326 327 328
	// Given the want limit and any newly received blocks, get as many wants as
	// we can to send out
	ks := s.sw.GetNextWants(s.wantLimit(), newks)
329 330
	if len(ks) == 0 {
		return
Jeromy's avatar
Jeromy committed
331
	}
332

333
	peers := s.pm.GetOptimizedPeers()
334
	if len(peers) > 0 {
335 336 337 338
		splitRequests := s.srs.SplitRequest(peers, ks)
		for _, splitRequest := range splitRequests {
			s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
			s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
339 340 341 342 343
		}
	} else {
		s.pm.RecordPeerRequests(nil, ks)
		s.wm.WantBlocks(ctx, ks, nil, s.id)
	}
Jeromy's avatar
Jeromy committed
344 345
}

346 347
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
348
}
349

Steven Allen's avatar
Steven Allen committed
350
func (s *Session) resetIdleTick() {
351
	var tickDelay time.Duration
352
	if s.latTotal == 0 {
Steven Allen's avatar
Steven Allen committed
353
		tickDelay = s.initialSearchDelay
354 355
	} else {
		avLat := s.averageLatency()
356
		tickDelay = s.baseTickDelay + (3 * avLat)
Jeromy's avatar
Jeromy committed
357
	}
358
	tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
Steven Allen's avatar
Steven Allen committed
359
	s.idleTick.Reset(tickDelay)
Jeromy's avatar
Jeromy committed
360
}
361

362
func (s *Session) wantLimit() int {
363
	if len(s.pm.GetOptimizedPeers()) > 0 {
364
		return targetedLiveWantsLimit
365
	}
366
	return broadcastLiveWantsLimit
367
}