session.go 8.44 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4 5 6

import (
	"context"
	"time"

Jeromy's avatar
Jeromy committed
7
	lru "github.com/hashicorp/golang-lru"
8 9
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
Jeromy's avatar
Jeromy committed
10 11 12 13 14
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
	loggables "github.com/libp2p/go-libp2p-loggables"
	peer "github.com/libp2p/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
15 16 17 18
)

const activeWantsLimit = 16

19
// WantManager is an interface that can be used to request blocks
20
// from given peers.
21
type WantManager interface {
22 23 24 25
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

26 27
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
28 29 30 31 32 33 34
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
	GetOptimizedPeers() []peer.ID
	RecordPeerRequests([]peer.ID, []cid.Cid)
	RecordPeerResponse(peer.ID, cid.Cid)
}

35 36 37 38 39 40 41 42 43 44
type interestReq struct {
	c    cid.Cid
	resp chan bool
}

type blkRecv struct {
	from peer.ID
	blk  blocks.Block
}

Jeromy's avatar
Jeromy committed
45 46
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
47
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
48
type Session struct {
49
	// dependencies
50 51 52
	ctx context.Context
	wm  WantManager
	pm  PeerManager
53 54 55 56 57 58 59 60 61 62

	// channels
	incoming      chan blkRecv
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	interestReqs  chan interestReq
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
63 64 65 66 67 68 69
	tofetch       *cidQueue
	interest      *lru.Cache
	liveWants     map[cid.Cid]time.Time
	tick          *time.Timer
	baseTickDelay time.Duration
	latTotal      time.Duration
	fetchcnt      int
70 71

	// identifiers
Jeromy's avatar
Jeromy committed
72
	notif notifications.PubSub
73 74
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
75 76
}

77
// New creates a new bitswap session whose lifetime is bounded by the
78
// given context.
79
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
Jeromy's avatar
Jeromy committed
80
	s := &Session{
Steven Allen's avatar
Steven Allen committed
81
		liveWants:     make(map[cid.Cid]time.Time),
82 83
		newReqs:       make(chan []cid.Cid),
		cancelKeys:    make(chan []cid.Cid),
84
		tofetch:       newCidQueue(),
Jeromy's avatar
Jeromy committed
85
		interestReqs:  make(chan interestReq),
86 87
		latencyReqs:   make(chan chan time.Duration),
		tickDelayReqs: make(chan time.Duration),
Jeromy's avatar
Jeromy committed
88
		ctx:           ctx,
89
		wm:            wm,
90
		pm:            pm,
Jeromy's avatar
Jeromy committed
91 92 93 94
		incoming:      make(chan blkRecv),
		notif:         notifications.New(),
		uuid:          loggables.Uuid("GetBlockRequest"),
		baseTickDelay: time.Millisecond * 500,
95
		id:            id,
Jeromy's avatar
Jeromy committed
96 97 98 99 100 101 102 103 104 105
	}

	cache, _ := lru.New(2048)
	s.interest = cache

	go s.run(ctx)

	return s
}

106 107 108 109 110
// ReceiveBlockFrom receives an incoming block from the given peer.
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
	select {
	case s.incoming <- blkRecv{from: from, blk: blk}:
	case <-s.ctx.Done():
111
	}
112 113 114
	ks := []cid.Cid{blk.Cid()}
	s.wm.CancelWants(s.ctx, ks, nil, s.id)

115
}
116

117 118 119
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
	return s.interest.Contains(c) || s.isLiveWant(c)
Jeromy's avatar
Jeromy committed
120 121
}

122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
	return bsgetter.AsyncGetBlocks(ctx, keys, s.notif, s.fetch, s.cancel)
}

// ID returns the sessions identifier.
func (s *Session) ID() uint64 {
	return s.id
Jeromy's avatar
Jeromy committed
138 139
}

140
// GetAverageLatency returns the average latency for block requests.
141 142
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
143
	select {
144 145 146 147 148 149 150 151
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
152
	case <-s.ctx.Done():
153
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
154
	}
Jeromy's avatar
Jeromy committed
155 156
}

157
// SetBaseTickDelay changes the rate at which ticks happen.
158 159 160 161 162
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
163 164 165 166 167 168 169 170 171
}

// TODO: PERF: this is using a channel to guard a map access against race
// conditions. This is definitely much slower than a mutex, though its unclear
// if it will actually induce any noticeable slowness. This is implemented this
// way to avoid adding a more complex set of mutexes around the liveWants map.
// note that in the average case (where this session *is* interested in the
// block we received) this function will not be called, as the cid will likely
// still be in the interest cache.
172
func (s *Session) isLiveWant(c cid.Cid) bool {
173
	resp := make(chan bool, 1)
174 175
	select {
	case s.interestReqs <- interestReq{
Jeromy's avatar
Jeromy committed
176 177
		c:    c,
		resp: resp,
178 179 180
	}:
	case <-s.ctx.Done():
		return false
Jeromy's avatar
Jeromy committed
181
	}
Jeromy's avatar
Jeromy committed
182 183 184 185 186 187 188

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
189 190
}

191 192 193 194 195
func (s *Session) fetch(ctx context.Context, keys []cid.Cid) {
	select {
	case s.newReqs <- keys:
	case <-ctx.Done():
	case <-s.ctx.Done():
Jeromy's avatar
Jeromy committed
196 197 198
	}
}

199 200 201 202
func (s *Session) cancel(keys []cid.Cid) {
	select {
	case s.cancelKeys <- keys:
	case <-s.ctx.Done():
Jeromy's avatar
Jeromy committed
203 204 205
	}
}

206 207 208 209
const provSearchDelay = time.Second * 10

// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
210 211 212 213 214
func (s *Session) run(ctx context.Context) {
	s.tick = time.NewTimer(provSearchDelay)
	for {
		select {
		case blk := <-s.incoming:
215
			s.handleIncomingBlock(ctx, blk)
Jeromy's avatar
Jeromy committed
216
		case keys := <-s.newReqs:
217
			s.handleNewRequest(ctx, keys)
Jeromy's avatar
Jeromy committed
218
		case keys := <-s.cancelKeys:
219
			s.handleCancel(keys)
Jeromy's avatar
Jeromy committed
220
		case <-s.tick.C:
221
			s.handleTick(ctx)
Jeromy's avatar
Jeromy committed
222
		case lwchk := <-s.interestReqs:
223
			lwchk.resp <- s.cidIsWanted(lwchk.c)
224 225 226 227
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
228
		case <-ctx.Done():
229
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
230 231 232 233 234
			return
		}
	}
}

235 236 237 238
func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
	s.tick.Stop()

	if blk.from != "" {
239
		s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
	}

	s.receiveBlock(ctx, blk.blk)

	s.resetTick()
}

func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
	for _, k := range keys {
		s.interest.Add(k, nil)
	}
	if len(s.liveWants) < activeWantsLimit {
		toadd := activeWantsLimit - len(s.liveWants)
		if toadd > len(keys) {
			toadd = len(keys)
		}

		now := keys[:toadd]
		keys = keys[toadd:]

		s.wantBlocks(ctx, now)
	}
	for _, k := range keys {
		s.tofetch.Push(k)
	}
}

func (s *Session) handleCancel(keys []cid.Cid) {
	for _, c := range keys {
		s.tofetch.Remove(c)
	}
}

273
func (s *Session) handleTick(ctx context.Context) {
274 275 276 277 278 279 280 281
	live := make([]cid.Cid, 0, len(s.liveWants))
	now := time.Now()
	for c := range s.liveWants {
		live = append(live, c)
		s.liveWants[c] = now
	}

	// Broadcast these keys to everyone we're connected to
282
	s.pm.RecordPeerRequests(nil, live)
283 284 285
	s.wm.WantBlocks(ctx, live, nil, s.id)

	if len(live) > 0 {
286
		s.pm.FindMorePeers(ctx, live[0])
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
	}
	s.resetTick()
}

func (s *Session) handleShutdown() {
	s.tick.Stop()
	s.notif.Shutdown()

	live := make([]cid.Cid, 0, len(s.liveWants))
	for c := range s.liveWants {
		live = append(live, c)
	}
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

302
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
303
	_, ok := s.liveWants[c]
304 305 306 307 308 309
	if !ok {
		ok = s.tofetch.Has(c)
	}
	return ok
}

Jeromy's avatar
Jeromy committed
310
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
311 312
	c := blk.Cid()
	if s.cidIsWanted(c) {
Steven Allen's avatar
Steven Allen committed
313
		tval, ok := s.liveWants[c]
314 315
		if ok {
			s.latTotal += time.Since(tval)
Steven Allen's avatar
Steven Allen committed
316
			delete(s.liveWants, c)
317 318 319
		} else {
			s.tofetch.Remove(c)
		}
Jeromy's avatar
Jeromy committed
320 321 322
		s.fetchcnt++
		s.notif.Publish(blk)

323 324
		if next := s.tofetch.Pop(); next.Defined() {
			s.wantBlocks(ctx, []cid.Cid{next})
Jeromy's avatar
Jeromy committed
325 326 327 328
		}
	}
}

329
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
330
	now := time.Now()
Jeromy's avatar
Jeromy committed
331
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
332
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
333
	}
334 335 336 337
	peers := s.pm.GetOptimizedPeers()
	// right now we're requesting each block from every peer, but soon, maybe not
	s.pm.RecordPeerRequests(peers, ks)
	s.wm.WantBlocks(ctx, ks, peers, s.id)
Jeromy's avatar
Jeromy committed
338 339
}

340 341
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
342
}
343 344 345 346 347 348
func (s *Session) resetTick() {
	if s.latTotal == 0 {
		s.tick.Reset(provSearchDelay)
	} else {
		avLat := s.averageLatency()
		s.tick.Reset(s.baseTickDelay + (3 * avLat))
Jeromy's avatar
Jeromy committed
349 350
	}
}