session.go 8.76 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4 5 6

import (
	"context"
	"time"

Jeromy's avatar
Jeromy committed
7
	lru "github.com/hashicorp/golang-lru"
8 9
	bsgetter "github.com/ipfs/go-bitswap/getter"
	notifications "github.com/ipfs/go-bitswap/notifications"
Jeromy's avatar
Jeromy committed
10 11 12 13 14
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
	loggables "github.com/libp2p/go-libp2p-loggables"
	peer "github.com/libp2p/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
15 16 17 18
)

const activeWantsLimit = 16

19
// Wantmanager is an interface that can be used to request blocks
20
// from given peers.
21
type WantManager interface {
22 23 24 25
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

26 27 28 29 30 31 32
type PeerManager interface {
	FindMorePeers(context.Context, cid.Cid)
	GetOptimizedPeers() []peer.ID
	RecordPeerRequests([]peer.ID, []cid.Cid)
	RecordPeerResponse(peer.ID, cid.Cid)
}

33 34 35 36 37 38 39 40 41 42
type interestReq struct {
	c    cid.Cid
	resp chan bool
}

type blkRecv struct {
	from peer.ID
	blk  blocks.Block
}

Jeromy's avatar
Jeromy committed
43 44
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
45
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
46
type Session struct {
47
	// dependencies
48 49 50
	ctx context.Context
	wm  WantManager
	pm  PeerManager
51 52 53 54 55 56 57 58 59 60

	// channels
	incoming      chan blkRecv
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	interestReqs  chan interestReq
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
61 62 63 64 65 66 67
	tofetch       *cidQueue
	interest      *lru.Cache
	liveWants     map[cid.Cid]time.Time
	tick          *time.Timer
	baseTickDelay time.Duration
	latTotal      time.Duration
	fetchcnt      int
68 69

	// identifiers
Jeromy's avatar
Jeromy committed
70
	notif notifications.PubSub
71 72
	uuid  logging.Loggable
	id    uint64
Jeromy's avatar
Jeromy committed
73 74
}

75
// New creates a new bitswap session whose lifetime is bounded by the
76
// given context.
77
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
Jeromy's avatar
Jeromy committed
78
	s := &Session{
Steven Allen's avatar
Steven Allen committed
79
		liveWants:     make(map[cid.Cid]time.Time),
80 81
		newReqs:       make(chan []cid.Cid),
		cancelKeys:    make(chan []cid.Cid),
82
		tofetch:       newCidQueue(),
Jeromy's avatar
Jeromy committed
83
		interestReqs:  make(chan interestReq),
84 85
		latencyReqs:   make(chan chan time.Duration),
		tickDelayReqs: make(chan time.Duration),
Jeromy's avatar
Jeromy committed
86
		ctx:           ctx,
87
		wm:            wm,
88
		pm:            pm,
Jeromy's avatar
Jeromy committed
89 90 91 92
		incoming:      make(chan blkRecv),
		notif:         notifications.New(),
		uuid:          loggables.Uuid("GetBlockRequest"),
		baseTickDelay: time.Millisecond * 500,
93
		id:            id,
Jeromy's avatar
Jeromy committed
94 95 96 97 98 99 100 101 102 103
	}

	cache, _ := lru.New(2048)
	s.interest = cache

	go s.run(ctx)

	return s
}

104 105 106 107 108
// ReceiveBlockFrom receives an incoming block from the given peer.
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
	select {
	case s.incoming <- blkRecv{from: from, blk: blk}:
	case <-s.ctx.Done():
109
	}
110
}
111

112 113 114
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
	return s.interest.Contains(c) || s.isLiveWant(c)
Jeromy's avatar
Jeromy committed
115 116
}

117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
	return bsgetter.AsyncGetBlocks(ctx, keys, s.notif, s.fetch, s.cancel)
}

// ID returns the sessions identifier.
func (s *Session) ID() uint64 {
	return s.id
Jeromy's avatar
Jeromy committed
133 134
}

135 136
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
137
	select {
138 139 140 141 142 143 144 145
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
146
	case <-s.ctx.Done():
147
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
148
	}
Jeromy's avatar
Jeromy committed
149 150
}

151 152 153 154 155
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
156 157 158 159 160 161 162 163 164
}

// TODO: PERF: this is using a channel to guard a map access against race
// conditions. This is definitely much slower than a mutex, though its unclear
// if it will actually induce any noticeable slowness. This is implemented this
// way to avoid adding a more complex set of mutexes around the liveWants map.
// note that in the average case (where this session *is* interested in the
// block we received) this function will not be called, as the cid will likely
// still be in the interest cache.
165
func (s *Session) isLiveWant(c cid.Cid) bool {
166
	resp := make(chan bool, 1)
167 168
	select {
	case s.interestReqs <- interestReq{
Jeromy's avatar
Jeromy committed
169 170
		c:    c,
		resp: resp,
171 172 173
	}:
	case <-s.ctx.Done():
		return false
Jeromy's avatar
Jeromy committed
174
	}
Jeromy's avatar
Jeromy committed
175 176 177 178 179 180 181

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
182 183
}

184 185 186 187 188
func (s *Session) fetch(ctx context.Context, keys []cid.Cid) {
	select {
	case s.newReqs <- keys:
	case <-ctx.Done():
	case <-s.ctx.Done():
Jeromy's avatar
Jeromy committed
189 190 191
	}
}

192 193 194 195
func (s *Session) cancel(keys []cid.Cid) {
	select {
	case s.cancelKeys <- keys:
	case <-s.ctx.Done():
Jeromy's avatar
Jeromy committed
196 197 198
	}
}

199 200 201 202
const provSearchDelay = time.Second * 10

// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
203 204 205 206 207
func (s *Session) run(ctx context.Context) {
	s.tick = time.NewTimer(provSearchDelay)
	for {
		select {
		case blk := <-s.incoming:
208
			s.handleIncomingBlock(ctx, blk)
Jeromy's avatar
Jeromy committed
209
		case keys := <-s.newReqs:
210
			s.handleNewRequest(ctx, keys)
Jeromy's avatar
Jeromy committed
211
		case keys := <-s.cancelKeys:
212
			s.handleCancel(keys)
Jeromy's avatar
Jeromy committed
213
		case <-s.tick.C:
214
			s.handleTick(ctx)
Jeromy's avatar
Jeromy committed
215
		case lwchk := <-s.interestReqs:
216
			lwchk.resp <- s.cidIsWanted(lwchk.c)
217 218 219 220
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
221
		case <-ctx.Done():
222
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
223 224 225 226 227
			return
		}
	}
}

228 229 230 231
func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
	s.tick.Stop()

	if blk.from != "" {
232
		s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
	}

	s.receiveBlock(ctx, blk.blk)

	s.resetTick()
}

func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
	for _, k := range keys {
		s.interest.Add(k, nil)
	}
	if len(s.liveWants) < activeWantsLimit {
		toadd := activeWantsLimit - len(s.liveWants)
		if toadd > len(keys) {
			toadd = len(keys)
		}

		now := keys[:toadd]
		keys = keys[toadd:]

		s.wantBlocks(ctx, now)
	}
	for _, k := range keys {
		s.tofetch.Push(k)
	}
}

func (s *Session) handleCancel(keys []cid.Cid) {
	for _, c := range keys {
		s.tofetch.Remove(c)
	}
}

266
func (s *Session) handleTick(ctx context.Context) {
267 268 269 270 271 272 273 274
	live := make([]cid.Cid, 0, len(s.liveWants))
	now := time.Now()
	for c := range s.liveWants {
		live = append(live, c)
		s.liveWants[c] = now
	}

	// Broadcast these keys to everyone we're connected to
275
	s.pm.RecordPeerRequests(nil, live)
276 277 278
	s.wm.WantBlocks(ctx, live, nil, s.id)

	if len(live) > 0 {
279
		s.pm.FindMorePeers(ctx, live[0])
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
	}
	s.resetTick()
}

func (s *Session) handleShutdown() {
	s.tick.Stop()
	s.notif.Shutdown()

	live := make([]cid.Cid, 0, len(s.liveWants))
	for c := range s.liveWants {
		live = append(live, c)
	}
	s.wm.CancelWants(s.ctx, live, nil, s.id)
}

295
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
296
	_, ok := s.liveWants[c]
297 298 299 300 301 302
	if !ok {
		ok = s.tofetch.Has(c)
	}
	return ok
}

Jeromy's avatar
Jeromy committed
303
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
304 305
	c := blk.Cid()
	if s.cidIsWanted(c) {
Steven Allen's avatar
Steven Allen committed
306
		tval, ok := s.liveWants[c]
307 308
		if ok {
			s.latTotal += time.Since(tval)
Steven Allen's avatar
Steven Allen committed
309
			delete(s.liveWants, c)
310 311 312
		} else {
			s.tofetch.Remove(c)
		}
Jeromy's avatar
Jeromy committed
313 314 315
		s.fetchcnt++
		s.notif.Publish(blk)

316 317
		if next := s.tofetch.Pop(); next.Defined() {
			s.wantBlocks(ctx, []cid.Cid{next})
Jeromy's avatar
Jeromy committed
318 319 320 321
		}
	}
}

322
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
323
	now := time.Now()
Jeromy's avatar
Jeromy committed
324
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
325
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
326
	}
327 328 329 330
	peers := s.pm.GetOptimizedPeers()
	// right now we're requesting each block from every peer, but soon, maybe not
	s.pm.RecordPeerRequests(peers, ks)
	s.wm.WantBlocks(ctx, ks, peers, s.id)
Jeromy's avatar
Jeromy committed
331 332
}

333 334
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
335
}
336 337 338 339 340 341
func (s *Session) resetTick() {
	if s.latTotal == 0 {
		s.tick.Reset(provSearchDelay)
	} else {
		avLat := s.averageLatency()
		s.tick.Reset(s.baseTickDelay + (3 * avLat))
Jeromy's avatar
Jeromy committed
342 343 344
	}
}

345
type cidQueue struct {
346
	elems []cid.Cid
347 348 349 350 351 352 353
	eset  *cid.Set
}

func newCidQueue() *cidQueue {
	return &cidQueue{eset: cid.NewSet()}
}

354
func (cq *cidQueue) Pop() cid.Cid {
355 356
	for {
		if len(cq.elems) == 0 {
357
			return cid.Cid{}
358 359 360 361 362 363 364 365 366 367 368 369
		}

		out := cq.elems[0]
		cq.elems = cq.elems[1:]

		if cq.eset.Has(out) {
			cq.eset.Remove(out)
			return out
		}
	}
}

370
func (cq *cidQueue) Push(c cid.Cid) {
371 372 373 374 375
	if cq.eset.Visit(c) {
		cq.elems = append(cq.elems, c)
	}
}

376
func (cq *cidQueue) Remove(c cid.Cid) {
377 378 379
	cq.eset.Remove(c)
}

380
func (cq *cidQueue) Has(c cid.Cid) bool {
381 382 383 384 385 386
	return cq.eset.Has(c)
}

func (cq *cidQueue) Len() int {
	return cq.eset.Len()
}