session.go 9.43 KB
Newer Older
1
package session
Jeromy's avatar
Jeromy committed
2 3 4

import (
	"context"
5
	"fmt"
Jeromy's avatar
Jeromy committed
6 7
	"time"

Jeromy's avatar
Jeromy committed
8
	lru "github.com/hashicorp/golang-lru"
9 10 11
	bsgetter "github.com/ipfs/go-bitswap/getter"
	bsnet "github.com/ipfs/go-bitswap/network"
	notifications "github.com/ipfs/go-bitswap/notifications"
Jeromy's avatar
Jeromy committed
12 13 14 15 16
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
	loggables "github.com/libp2p/go-libp2p-loggables"
	peer "github.com/libp2p/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
17 18 19 20
)

const activeWantsLimit = 16

hannahhoward's avatar
hannahhoward committed
21
// SessionWantManager is an interface that can be used to request blocks
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
// from given peers.
type SessionWantManager interface {
	WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
	CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}

type interestReq struct {
	c    cid.Cid
	resp chan bool
}

type blkRecv struct {
	from peer.ID
	blk  blocks.Block
}

Jeromy's avatar
Jeromy committed
38 39
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
40
// info to, and who to request blocks from.
Jeromy's avatar
Jeromy committed
41
type Session struct {
42 43 44 45 46 47 48 49 50 51 52 53 54 55
	// dependencies
	ctx     context.Context
	wm      SessionWantManager
	network bsnet.BitSwapNetwork

	// channels
	incoming      chan blkRecv
	newReqs       chan []cid.Cid
	cancelKeys    chan []cid.Cid
	interestReqs  chan interestReq
	latencyReqs   chan chan time.Duration
	tickDelayReqs chan time.Duration

	// do not touch outside run loop
56
	tofetch        *cidQueue
Jeromy's avatar
Jeromy committed
57 58
	activePeers    map[peer.ID]struct{}
	activePeersArr []peer.ID
59 60 61 62 63 64 65 66
	interest       *lru.Cache
	liveWants      map[cid.Cid]time.Time
	tick           *time.Timer
	baseTickDelay  time.Duration
	latTotal       time.Duration
	fetchcnt       int

	// identifiers
Jeromy's avatar
Jeromy committed
67
	notif notifications.PubSub
68 69 70
	uuid  logging.Loggable
	id    uint64
	tag   string
Jeromy's avatar
Jeromy committed
71 72
}

73
// New creates a new bitswap session whose lifetime is bounded by the
74
// given context.
75
func New(ctx context.Context, id uint64, wm SessionWantManager, network bsnet.BitSwapNetwork) *Session {
Jeromy's avatar
Jeromy committed
76 77
	s := &Session{
		activePeers:   make(map[peer.ID]struct{}),
Steven Allen's avatar
Steven Allen committed
78
		liveWants:     make(map[cid.Cid]time.Time),
79 80
		newReqs:       make(chan []cid.Cid),
		cancelKeys:    make(chan []cid.Cid),
81
		tofetch:       newCidQueue(),
Jeromy's avatar
Jeromy committed
82
		interestReqs:  make(chan interestReq),
83 84
		latencyReqs:   make(chan chan time.Duration),
		tickDelayReqs: make(chan time.Duration),
Jeromy's avatar
Jeromy committed
85
		ctx:           ctx,
86 87
		wm:            wm,
		network:       network,
Jeromy's avatar
Jeromy committed
88 89 90 91
		incoming:      make(chan blkRecv),
		notif:         notifications.New(),
		uuid:          loggables.Uuid("GetBlockRequest"),
		baseTickDelay: time.Millisecond * 500,
92
		id:            id,
Jeromy's avatar
Jeromy committed
93 94
	}

95 96
	s.tag = fmt.Sprint("bs-ses-", s.id)

Jeromy's avatar
Jeromy committed
97 98 99 100 101 102 103 104
	cache, _ := lru.New(2048)
	s.interest = cache

	go s.run(ctx)

	return s
}

105 106 107 108 109
// ReceiveBlockFrom receives an incoming block from the given peer.
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
	select {
	case s.incoming <- blkRecv{from: from, blk: blk}:
	case <-s.ctx.Done():
110
	}
111
}
112

113 114 115
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
	return s.interest.Contains(c) || s.isLiveWant(c)
Jeromy's avatar
Jeromy committed
116 117
}

118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
	return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
	return bsgetter.AsyncGetBlocks(ctx, keys, s.notif, s.fetch, s.cancel)
}

// ID returns the sessions identifier.
func (s *Session) ID() uint64 {
	return s.id
Jeromy's avatar
Jeromy committed
134 135
}

136 137
func (s *Session) GetAverageLatency() time.Duration {
	resp := make(chan time.Duration)
Jeromy's avatar
Jeromy committed
138
	select {
139 140 141 142 143 144 145 146
	case s.latencyReqs <- resp:
	case <-s.ctx.Done():
		return -1 * time.Millisecond
	}

	select {
	case latency := <-resp:
		return latency
Jeromy's avatar
Jeromy committed
147
	case <-s.ctx.Done():
148
		return -1 * time.Millisecond
Jeromy's avatar
Jeromy committed
149
	}
Jeromy's avatar
Jeromy committed
150 151
}

152 153 154 155 156
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
	select {
	case s.tickDelayReqs <- baseTickDelay:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
157 158 159 160 161 162 163 164 165
}

// TODO: PERF: this is using a channel to guard a map access against race
// conditions. This is definitely much slower than a mutex, though its unclear
// if it will actually induce any noticeable slowness. This is implemented this
// way to avoid adding a more complex set of mutexes around the liveWants map.
// note that in the average case (where this session *is* interested in the
// block we received) this function will not be called, as the cid will likely
// still be in the interest cache.
166
func (s *Session) isLiveWant(c cid.Cid) bool {
167
	resp := make(chan bool, 1)
168 169
	select {
	case s.interestReqs <- interestReq{
Jeromy's avatar
Jeromy committed
170 171
		c:    c,
		resp: resp,
172 173 174
	}:
	case <-s.ctx.Done():
		return false
Jeromy's avatar
Jeromy committed
175
	}
Jeromy's avatar
Jeromy committed
176 177 178 179 180 181 182

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
183 184
}

185 186 187 188 189
func (s *Session) fetch(ctx context.Context, keys []cid.Cid) {
	select {
	case s.newReqs <- keys:
	case <-ctx.Done():
	case <-s.ctx.Done():
Jeromy's avatar
Jeromy committed
190 191 192
	}
}

193 194 195 196
func (s *Session) cancel(keys []cid.Cid) {
	select {
	case s.cancelKeys <- keys:
	case <-s.ctx.Done():
Jeromy's avatar
Jeromy committed
197 198 199
	}
}

200 201 202 203
const provSearchDelay = time.Second * 10

// Session run loop -- everything function below here should not be called
// of this loop
Jeromy's avatar
Jeromy committed
204 205 206 207 208 209
func (s *Session) run(ctx context.Context) {
	s.tick = time.NewTimer(provSearchDelay)
	newpeers := make(chan peer.ID, 16)
	for {
		select {
		case blk := <-s.incoming:
210
			s.handleIncomingBlock(ctx, blk)
Jeromy's avatar
Jeromy committed
211
		case keys := <-s.newReqs:
212
			s.handleNewRequest(ctx, keys)
Jeromy's avatar
Jeromy committed
213
		case keys := <-s.cancelKeys:
214
			s.handleCancel(keys)
Jeromy's avatar
Jeromy committed
215
		case <-s.tick.C:
216
			s.handleTick(ctx, newpeers)
Jeromy's avatar
Jeromy committed
217 218
		case p := <-newpeers:
			s.addActivePeer(p)
Jeromy's avatar
Jeromy committed
219
		case lwchk := <-s.interestReqs:
220
			lwchk.resp <- s.cidIsWanted(lwchk.c)
221 222 223 224
		case resp := <-s.latencyReqs:
			resp <- s.averageLatency()
		case baseTickDelay := <-s.tickDelayReqs:
			s.baseTickDelay = baseTickDelay
Jeromy's avatar
Jeromy committed
225
		case <-ctx.Done():
226
			s.handleShutdown()
Jeromy's avatar
Jeromy committed
227 228 229 230 231
			return
		}
	}
}

232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
	s.tick.Stop()

	if blk.from != "" {
		s.addActivePeer(blk.from)
	}

	s.receiveBlock(ctx, blk.blk)

	s.resetTick()
}

func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
	for _, k := range keys {
		s.interest.Add(k, nil)
	}
	if len(s.liveWants) < activeWantsLimit {
		toadd := activeWantsLimit - len(s.liveWants)
		if toadd > len(keys) {
			toadd = len(keys)
		}

		now := keys[:toadd]
		keys = keys[toadd:]

		s.wantBlocks(ctx, now)
	}
	for _, k := range keys {
		s.tofetch.Push(k)
	}
}

func (s *Session) handleCancel(keys []cid.Cid) {
	for _, c := range keys {
		s.tofetch.Remove(c)
	}
}

func (s *Session) handleTick(ctx context.Context, newpeers chan<- peer.ID) {
	live := make([]cid.Cid, 0, len(s.liveWants))
	now := time.Now()
	for c := range s.liveWants {
		live = append(live, c)
		s.liveWants[c] = now
	}

	// Broadcast these keys to everyone we're connected to
	s.wm.WantBlocks(ctx, live, nil, s.id)

	if len(live) > 0 {
		go func(k cid.Cid) {
			// TODO: have a task queue setup for this to:
			// - rate limit
			// - manage timeouts
			// - ensure two 'findprovs' calls for the same block don't run concurrently
			// - share peers between sessions based on interest set
			for p := range s.network.FindProvidersAsync(ctx, k, 10) {
				newpeers <- p
			}
		}(live[0])
	}
	s.resetTick()
}

func (s *Session) addActivePeer(p peer.ID) {
	if _, ok := s.activePeers[p]; !ok {
		s.activePeers[p] = struct{}{}
		s.activePeersArr = append(s.activePeersArr, p)

		cmgr := s.network.ConnectionManager()
		cmgr.TagPeer(p, s.tag, 10)
	}
}

func (s *Session) handleShutdown() {
	s.tick.Stop()
	s.notif.Shutdown()

	live := make([]cid.Cid, 0, len(s.liveWants))
	for c := range s.liveWants {
		live = append(live, c)
	}
	s.wm.CancelWants(s.ctx, live, nil, s.id)
	cmgr := s.network.ConnectionManager()
	for _, p := range s.activePeersArr {
		cmgr.UntagPeer(p, s.tag)
	}
}

321
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
322
	_, ok := s.liveWants[c]
323 324 325 326 327 328
	if !ok {
		ok = s.tofetch.Has(c)
	}
	return ok
}

Jeromy's avatar
Jeromy committed
329
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
330 331
	c := blk.Cid()
	if s.cidIsWanted(c) {
Steven Allen's avatar
Steven Allen committed
332
		tval, ok := s.liveWants[c]
333 334
		if ok {
			s.latTotal += time.Since(tval)
Steven Allen's avatar
Steven Allen committed
335
			delete(s.liveWants, c)
336 337 338
		} else {
			s.tofetch.Remove(c)
		}
Jeromy's avatar
Jeromy committed
339 340 341
		s.fetchcnt++
		s.notif.Publish(blk)

342 343
		if next := s.tofetch.Pop(); next.Defined() {
			s.wantBlocks(ctx, []cid.Cid{next})
Jeromy's avatar
Jeromy committed
344 345 346 347
		}
	}
}

348
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
349
	now := time.Now()
Jeromy's avatar
Jeromy committed
350
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
351
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
352
	}
353
	s.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
Jeromy's avatar
Jeromy committed
354 355
}

356 357
func (s *Session) averageLatency() time.Duration {
	return s.latTotal / time.Duration(s.fetchcnt)
Jeromy's avatar
Jeromy committed
358
}
359 360 361 362 363 364
func (s *Session) resetTick() {
	if s.latTotal == 0 {
		s.tick.Reset(provSearchDelay)
	} else {
		avLat := s.averageLatency()
		s.tick.Reset(s.baseTickDelay + (3 * avLat))
Jeromy's avatar
Jeromy committed
365 366 367
	}
}

368
type cidQueue struct {
369
	elems []cid.Cid
370 371 372 373 374 375 376
	eset  *cid.Set
}

func newCidQueue() *cidQueue {
	return &cidQueue{eset: cid.NewSet()}
}

377
func (cq *cidQueue) Pop() cid.Cid {
378 379
	for {
		if len(cq.elems) == 0 {
380
			return cid.Cid{}
381 382 383 384 385 386 387 388 389 390 391 392
		}

		out := cq.elems[0]
		cq.elems = cq.elems[1:]

		if cq.eset.Has(out) {
			cq.eset.Remove(out)
			return out
		}
	}
}

393
func (cq *cidQueue) Push(c cid.Cid) {
394 395 396 397 398
	if cq.eset.Visit(c) {
		cq.elems = append(cq.elems, c)
	}
}

399
func (cq *cidQueue) Remove(c cid.Cid) {
400 401 402
	cq.eset.Remove(c)
}

403
func (cq *cidQueue) Has(c cid.Cid) bool {
404 405 406 407 408 409
	return cq.eset.Has(c)
}

func (cq *cidQueue) Len() int {
	return cq.eset.Len()
}