session.go 7.82 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3 4
package bitswap

import (
	"context"
5
	"fmt"
Jeromy's avatar
Jeromy committed
6 7
	"time"

Jeromy's avatar
Jeromy committed
8 9 10 11 12 13 14 15
	notifications "github.com/ipfs/go-bitswap/notifications"

	lru "github.com/hashicorp/golang-lru"
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
	loggables "github.com/libp2p/go-libp2p-loggables"
	peer "github.com/libp2p/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
16 17 18 19
)

const activeWantsLimit = 16

Jeromy's avatar
Jeromy committed
20 21 22
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
// info to, and who to request blocks from
Jeromy's avatar
Jeromy committed
23 24
type Session struct {
	ctx            context.Context
25
	tofetch        *cidQueue
Jeromy's avatar
Jeromy committed
26 27 28
	activePeers    map[peer.ID]struct{}
	activePeersArr []peer.ID

Jeromy's avatar
Jeromy committed
29 30
	bs           *Bitswap
	incoming     chan blkRecv
31 32
	newReqs      chan []cid.Cid
	cancelKeys   chan []cid.Cid
Jeromy's avatar
Jeromy committed
33
	interestReqs chan interestReq
Jeromy's avatar
Jeromy committed
34 35

	interest  *lru.Cache
Steven Allen's avatar
Steven Allen committed
36
	liveWants map[cid.Cid]time.Time
Jeromy's avatar
Jeromy committed
37 38 39 40 41 42 43 44 45 46

	tick          *time.Timer
	baseTickDelay time.Duration

	latTotal time.Duration
	fetchcnt int

	notif notifications.PubSub

	uuid logging.Loggable
Jeromy's avatar
Jeromy committed
47

48 49
	id  uint64
	tag string
Jeromy's avatar
Jeromy committed
50 51
}

Jeromy's avatar
Jeromy committed
52 53
// NewSession creates a new bitswap session whose lifetime is bounded by the
// given context
Jeromy's avatar
Jeromy committed
54 55 56
func (bs *Bitswap) NewSession(ctx context.Context) *Session {
	s := &Session{
		activePeers:   make(map[peer.ID]struct{}),
Steven Allen's avatar
Steven Allen committed
57
		liveWants:     make(map[cid.Cid]time.Time),
58 59
		newReqs:       make(chan []cid.Cid),
		cancelKeys:    make(chan []cid.Cid),
60
		tofetch:       newCidQueue(),
Jeromy's avatar
Jeromy committed
61
		interestReqs:  make(chan interestReq),
Jeromy's avatar
Jeromy committed
62 63 64 65 66 67
		ctx:           ctx,
		bs:            bs,
		incoming:      make(chan blkRecv),
		notif:         notifications.New(),
		uuid:          loggables.Uuid("GetBlockRequest"),
		baseTickDelay: time.Millisecond * 500,
Jeromy's avatar
Jeromy committed
68
		id:            bs.getNextSessionID(),
Jeromy's avatar
Jeromy committed
69 70
	}

71 72
	s.tag = fmt.Sprint("bs-ses-", s.id)

Jeromy's avatar
Jeromy committed
73 74 75 76 77 78 79 80 81 82 83 84
	cache, _ := lru.New(2048)
	s.interest = cache

	bs.sessLk.Lock()
	bs.sessions = append(bs.sessions, s)
	bs.sessLk.Unlock()

	go s.run(ctx)

	return s
}

Jeromy's avatar
Jeromy committed
85
func (bs *Bitswap) removeSession(s *Session) {
86
	s.notif.Shutdown()
87

88
	live := make([]cid.Cid, 0, len(s.liveWants))
89
	for c := range s.liveWants {
Steven Allen's avatar
Steven Allen committed
90
		live = append(live, c)
91 92 93
	}
	bs.CancelWants(live, s.id)

Jeromy's avatar
Jeromy committed
94 95 96 97 98 99 100 101 102 103 104
	bs.sessLk.Lock()
	defer bs.sessLk.Unlock()
	for i := 0; i < len(bs.sessions); i++ {
		if bs.sessions[i] == s {
			bs.sessions[i] = bs.sessions[len(bs.sessions)-1]
			bs.sessions = bs.sessions[:len(bs.sessions)-1]
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
105 106 107 108 109
type blkRecv struct {
	from peer.ID
	blk  blocks.Block
}

Jeromy's avatar
Jeromy committed
110
func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) {
Jeromy's avatar
Jeromy committed
111 112 113 114
	select {
	case s.incoming <- blkRecv{from: from, blk: blk}:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
115 116
}

Jeromy's avatar
Jeromy committed
117
type interestReq struct {
118
	c    cid.Cid
Jeromy's avatar
Jeromy committed
119 120 121 122 123 124 125 126 127 128
	resp chan bool
}

// TODO: PERF: this is using a channel to guard a map access against race
// conditions. This is definitely much slower than a mutex, though its unclear
// if it will actually induce any noticeable slowness. This is implemented this
// way to avoid adding a more complex set of mutexes around the liveWants map.
// note that in the average case (where this session *is* interested in the
// block we received) this function will not be called, as the cid will likely
// still be in the interest cache.
129
func (s *Session) isLiveWant(c cid.Cid) bool {
130
	resp := make(chan bool, 1)
131 132
	select {
	case s.interestReqs <- interestReq{
Jeromy's avatar
Jeromy committed
133 134
		c:    c,
		resp: resp,
135 136 137
	}:
	case <-s.ctx.Done():
		return false
Jeromy's avatar
Jeromy committed
138
	}
Jeromy's avatar
Jeromy committed
139 140 141 142 143 144 145

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
146 147
}

148
func (s *Session) interestedIn(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
149
	return s.interest.Contains(c) || s.isLiveWant(c)
Jeromy's avatar
Jeromy committed
150 151 152 153 154 155 156 157
}

const provSearchDelay = time.Second * 10

func (s *Session) addActivePeer(p peer.ID) {
	if _, ok := s.activePeers[p]; !ok {
		s.activePeers[p] = struct{}{}
		s.activePeersArr = append(s.activePeersArr, p)
158 159 160

		cmgr := s.bs.network.ConnectionManager()
		cmgr.TagPeer(p, s.tag, 10)
Jeromy's avatar
Jeromy committed
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
	}
}

func (s *Session) resetTick() {
	if s.latTotal == 0 {
		s.tick.Reset(provSearchDelay)
	} else {
		avLat := s.latTotal / time.Duration(s.fetchcnt)
		s.tick.Reset(s.baseTickDelay + (3 * avLat))
	}
}

func (s *Session) run(ctx context.Context) {
	s.tick = time.NewTimer(provSearchDelay)
	newpeers := make(chan peer.ID, 16)
	for {
		select {
		case blk := <-s.incoming:
			s.tick.Stop()

181 182 183
			if blk.from != "" {
				s.addActivePeer(blk.from)
			}
Jeromy's avatar
Jeromy committed
184 185 186 187 188 189

			s.receiveBlock(ctx, blk.blk)

			s.resetTick()
		case keys := <-s.newReqs:
			for _, k := range keys {
Steven Allen's avatar
Steven Allen committed
190
				s.interest.Add(k, nil)
Jeromy's avatar
Jeromy committed
191
			}
Jeromy's avatar
Jeromy committed
192 193
			if len(s.liveWants) < activeWantsLimit {
				toadd := activeWantsLimit - len(s.liveWants)
Jeromy's avatar
Jeromy committed
194 195 196 197 198 199 200 201 202
				if toadd > len(keys) {
					toadd = len(keys)
				}

				now := keys[:toadd]
				keys = keys[toadd:]

				s.wantBlocks(ctx, now)
			}
203 204 205
			for _, k := range keys {
				s.tofetch.Push(k)
			}
Jeromy's avatar
Jeromy committed
206 207 208 209
		case keys := <-s.cancelKeys:
			s.cancel(keys)

		case <-s.tick.C:
210
			live := make([]cid.Cid, 0, len(s.liveWants))
211
			now := time.Now()
Jeromy's avatar
Jeromy committed
212
			for c := range s.liveWants {
Steven Allen's avatar
Steven Allen committed
213
				live = append(live, c)
214
				s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
215 216 217
			}

			// Broadcast these keys to everyone we're connected to
Jeromy's avatar
Jeromy committed
218
			s.bs.wm.WantBlocks(ctx, live, nil, s.id)
Jeromy's avatar
Jeromy committed
219 220

			if len(live) > 0 {
221
				go func(k cid.Cid) {
Jeromy's avatar
Jeromy committed
222 223 224 225 226 227
					// TODO: have a task queue setup for this to:
					// - rate limit
					// - manage timeouts
					// - ensure two 'findprovs' calls for the same block don't run concurrently
					// - share peers between sessions based on interest set
					for p := range s.bs.network.FindProvidersAsync(ctx, k, 10) {
Jeromy's avatar
Jeromy committed
228 229
						newpeers <- p
					}
Jeromy's avatar
Jeromy committed
230
				}(live[0])
Jeromy's avatar
Jeromy committed
231 232 233 234
			}
			s.resetTick()
		case p := <-newpeers:
			s.addActivePeer(p)
Jeromy's avatar
Jeromy committed
235
		case lwchk := <-s.interestReqs:
236
			lwchk.resp <- s.cidIsWanted(lwchk.c)
Jeromy's avatar
Jeromy committed
237
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
238
			s.tick.Stop()
Jeromy's avatar
Jeromy committed
239
			s.bs.removeSession(s)
240 241 242 243 244

			cmgr := s.bs.network.ConnectionManager()
			for _, p := range s.activePeersArr {
				cmgr.UntagPeer(p, s.tag)
			}
Jeromy's avatar
Jeromy committed
245 246 247 248 249
			return
		}
	}
}

250
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
251
	_, ok := s.liveWants[c]
252 253 254 255 256 257 258
	if !ok {
		ok = s.tofetch.Has(c)
	}

	return ok
}

Jeromy's avatar
Jeromy committed
259
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
260 261
	c := blk.Cid()
	if s.cidIsWanted(c) {
Steven Allen's avatar
Steven Allen committed
262
		tval, ok := s.liveWants[c]
263 264
		if ok {
			s.latTotal += time.Since(tval)
Steven Allen's avatar
Steven Allen committed
265
			delete(s.liveWants, c)
266 267 268
		} else {
			s.tofetch.Remove(c)
		}
Jeromy's avatar
Jeromy committed
269 270 271
		s.fetchcnt++
		s.notif.Publish(blk)

272 273
		if next := s.tofetch.Pop(); next.Defined() {
			s.wantBlocks(ctx, []cid.Cid{next})
Jeromy's avatar
Jeromy committed
274 275 276 277
		}
	}
}

278
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
279
	now := time.Now()
Jeromy's avatar
Jeromy committed
280
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
281
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
282
	}
Jeromy's avatar
Jeromy committed
283
	s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
Jeromy's avatar
Jeromy committed
284 285
}

286
func (s *Session) cancel(keys []cid.Cid) {
Jeromy's avatar
Jeromy committed
287
	for _, c := range keys {
288
		s.tofetch.Remove(c)
Jeromy's avatar
Jeromy committed
289 290 291
	}
}

292
func (s *Session) cancelWants(keys []cid.Cid) {
293 294 295 296
	select {
	case s.cancelKeys <- keys:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
297 298
}

299
func (s *Session) fetch(ctx context.Context, keys []cid.Cid) {
Jeromy's avatar
Jeromy committed
300 301 302
	select {
	case s.newReqs <- keys:
	case <-ctx.Done():
303
	case <-s.ctx.Done():
Jeromy's avatar
Jeromy committed
304 305 306
	}
}

Jeromy's avatar
Jeromy committed
307 308 309
// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
310
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
Jeromy's avatar
Jeromy committed
311 312 313 314
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
	return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants)
}

Jeromy's avatar
Jeromy committed
315
// GetBlock fetches a single block
316
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
Jeromy's avatar
Jeromy committed
317 318
	return getBlock(parent, k, s.GetBlocks)
}
319 320

type cidQueue struct {
321
	elems []cid.Cid
322 323 324 325 326 327 328
	eset  *cid.Set
}

func newCidQueue() *cidQueue {
	return &cidQueue{eset: cid.NewSet()}
}

329
func (cq *cidQueue) Pop() cid.Cid {
330 331
	for {
		if len(cq.elems) == 0 {
332
			return cid.Cid{}
333 334 335 336 337 338 339 340 341 342 343 344
		}

		out := cq.elems[0]
		cq.elems = cq.elems[1:]

		if cq.eset.Has(out) {
			cq.eset.Remove(out)
			return out
		}
	}
}

345
func (cq *cidQueue) Push(c cid.Cid) {
346 347 348 349 350
	if cq.eset.Visit(c) {
		cq.elems = append(cq.elems, c)
	}
}

351
func (cq *cidQueue) Remove(c cid.Cid) {
352 353 354
	cq.eset.Remove(c)
}

355
func (cq *cidQueue) Has(c cid.Cid) bool {
356 357 358 359 360 361
	return cq.eset.Has(c)
}

func (cq *cidQueue) Len() int {
	return cq.eset.Len()
}