session.go 7.88 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3 4
package bitswap

import (
	"context"
5
	"fmt"
Jeromy's avatar
Jeromy committed
6 7
	"time"

Jeromy's avatar
Jeromy committed
8 9 10 11 12
	notifications "github.com/ipfs/go-bitswap/notifications"

	lru "github.com/hashicorp/golang-lru"
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
13
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
Jeromy's avatar
Jeromy committed
14 15 16
	logging "github.com/ipfs/go-log"
	loggables "github.com/libp2p/go-libp2p-loggables"
	peer "github.com/libp2p/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
17 18 19 20
)

const activeWantsLimit = 16

Jeromy's avatar
Jeromy committed
21 22 23
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
// info to, and who to request blocks from
Jeromy's avatar
Jeromy committed
24 25
type Session struct {
	ctx            context.Context
26
	tofetch        *cidQueue
Jeromy's avatar
Jeromy committed
27 28 29
	activePeers    map[peer.ID]struct{}
	activePeersArr []peer.ID

Jeromy's avatar
Jeromy committed
30 31
	bs           *Bitswap
	incoming     chan blkRecv
32 33
	newReqs      chan []cid.Cid
	cancelKeys   chan []cid.Cid
Jeromy's avatar
Jeromy committed
34
	interestReqs chan interestReq
Jeromy's avatar
Jeromy committed
35 36

	interest  *lru.Cache
Steven Allen's avatar
Steven Allen committed
37
	liveWants map[cid.Cid]time.Time
Jeromy's avatar
Jeromy committed
38 39 40 41 42 43 44 45 46 47

	tick          *time.Timer
	baseTickDelay time.Duration

	latTotal time.Duration
	fetchcnt int

	notif notifications.PubSub

	uuid logging.Loggable
Jeromy's avatar
Jeromy committed
48

49 50
	id  uint64
	tag string
Jeromy's avatar
Jeromy committed
51 52
}

Jeromy's avatar
Jeromy committed
53 54
// NewSession creates a new bitswap session whose lifetime is bounded by the
// given context
55
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
Jeromy's avatar
Jeromy committed
56 57
	s := &Session{
		activePeers:   make(map[peer.ID]struct{}),
Steven Allen's avatar
Steven Allen committed
58
		liveWants:     make(map[cid.Cid]time.Time),
59 60
		newReqs:       make(chan []cid.Cid),
		cancelKeys:    make(chan []cid.Cid),
61
		tofetch:       newCidQueue(),
Jeromy's avatar
Jeromy committed
62
		interestReqs:  make(chan interestReq),
Jeromy's avatar
Jeromy committed
63 64 65 66 67 68
		ctx:           ctx,
		bs:            bs,
		incoming:      make(chan blkRecv),
		notif:         notifications.New(),
		uuid:          loggables.Uuid("GetBlockRequest"),
		baseTickDelay: time.Millisecond * 500,
Jeromy's avatar
Jeromy committed
69
		id:            bs.getNextSessionID(),
Jeromy's avatar
Jeromy committed
70 71
	}

72 73
	s.tag = fmt.Sprint("bs-ses-", s.id)

Jeromy's avatar
Jeromy committed
74 75 76 77 78 79 80 81 82 83 84 85
	cache, _ := lru.New(2048)
	s.interest = cache

	bs.sessLk.Lock()
	bs.sessions = append(bs.sessions, s)
	bs.sessLk.Unlock()

	go s.run(ctx)

	return s
}

Jeromy's avatar
Jeromy committed
86
func (bs *Bitswap) removeSession(s *Session) {
87
	s.notif.Shutdown()
88

89
	live := make([]cid.Cid, 0, len(s.liveWants))
90
	for c := range s.liveWants {
Steven Allen's avatar
Steven Allen committed
91
		live = append(live, c)
92 93 94
	}
	bs.CancelWants(live, s.id)

Jeromy's avatar
Jeromy committed
95 96 97 98 99 100 101 102 103 104 105
	bs.sessLk.Lock()
	defer bs.sessLk.Unlock()
	for i := 0; i < len(bs.sessions); i++ {
		if bs.sessions[i] == s {
			bs.sessions[i] = bs.sessions[len(bs.sessions)-1]
			bs.sessions = bs.sessions[:len(bs.sessions)-1]
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
106 107 108 109 110
type blkRecv struct {
	from peer.ID
	blk  blocks.Block
}

Jeromy's avatar
Jeromy committed
111
func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) {
Jeromy's avatar
Jeromy committed
112 113 114 115
	select {
	case s.incoming <- blkRecv{from: from, blk: blk}:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
116 117
}

Jeromy's avatar
Jeromy committed
118
type interestReq struct {
119
	c    cid.Cid
Jeromy's avatar
Jeromy committed
120 121 122 123 124 125 126 127 128 129
	resp chan bool
}

// TODO: PERF: this is using a channel to guard a map access against race
// conditions. This is definitely much slower than a mutex, though its unclear
// if it will actually induce any noticeable slowness. This is implemented this
// way to avoid adding a more complex set of mutexes around the liveWants map.
// note that in the average case (where this session *is* interested in the
// block we received) this function will not be called, as the cid will likely
// still be in the interest cache.
130
func (s *Session) isLiveWant(c cid.Cid) bool {
131
	resp := make(chan bool, 1)
132 133
	select {
	case s.interestReqs <- interestReq{
Jeromy's avatar
Jeromy committed
134 135
		c:    c,
		resp: resp,
136 137 138
	}:
	case <-s.ctx.Done():
		return false
Jeromy's avatar
Jeromy committed
139
	}
Jeromy's avatar
Jeromy committed
140 141 142 143 144 145 146

	select {
	case want := <-resp:
		return want
	case <-s.ctx.Done():
		return false
	}
Jeromy's avatar
Jeromy committed
147 148
}

149
func (s *Session) interestedIn(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
150
	return s.interest.Contains(c) || s.isLiveWant(c)
Jeromy's avatar
Jeromy committed
151 152 153 154 155 156 157 158
}

const provSearchDelay = time.Second * 10

func (s *Session) addActivePeer(p peer.ID) {
	if _, ok := s.activePeers[p]; !ok {
		s.activePeers[p] = struct{}{}
		s.activePeersArr = append(s.activePeersArr, p)
159 160 161

		cmgr := s.bs.network.ConnectionManager()
		cmgr.TagPeer(p, s.tag, 10)
Jeromy's avatar
Jeromy committed
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
	}
}

func (s *Session) resetTick() {
	if s.latTotal == 0 {
		s.tick.Reset(provSearchDelay)
	} else {
		avLat := s.latTotal / time.Duration(s.fetchcnt)
		s.tick.Reset(s.baseTickDelay + (3 * avLat))
	}
}

func (s *Session) run(ctx context.Context) {
	s.tick = time.NewTimer(provSearchDelay)
	newpeers := make(chan peer.ID, 16)
	for {
		select {
		case blk := <-s.incoming:
			s.tick.Stop()

182 183 184
			if blk.from != "" {
				s.addActivePeer(blk.from)
			}
Jeromy's avatar
Jeromy committed
185 186 187 188 189 190

			s.receiveBlock(ctx, blk.blk)

			s.resetTick()
		case keys := <-s.newReqs:
			for _, k := range keys {
Steven Allen's avatar
Steven Allen committed
191
				s.interest.Add(k, nil)
Jeromy's avatar
Jeromy committed
192
			}
Jeromy's avatar
Jeromy committed
193 194
			if len(s.liveWants) < activeWantsLimit {
				toadd := activeWantsLimit - len(s.liveWants)
Jeromy's avatar
Jeromy committed
195 196 197 198 199 200 201 202 203
				if toadd > len(keys) {
					toadd = len(keys)
				}

				now := keys[:toadd]
				keys = keys[toadd:]

				s.wantBlocks(ctx, now)
			}
204 205 206
			for _, k := range keys {
				s.tofetch.Push(k)
			}
Jeromy's avatar
Jeromy committed
207 208 209 210
		case keys := <-s.cancelKeys:
			s.cancel(keys)

		case <-s.tick.C:
211
			live := make([]cid.Cid, 0, len(s.liveWants))
212
			now := time.Now()
Jeromy's avatar
Jeromy committed
213
			for c := range s.liveWants {
Steven Allen's avatar
Steven Allen committed
214
				live = append(live, c)
215
				s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
216 217 218
			}

			// Broadcast these keys to everyone we're connected to
Jeromy's avatar
Jeromy committed
219
			s.bs.wm.WantBlocks(ctx, live, nil, s.id)
Jeromy's avatar
Jeromy committed
220 221

			if len(live) > 0 {
222
				go func(k cid.Cid) {
Jeromy's avatar
Jeromy committed
223 224 225 226 227 228
					// TODO: have a task queue setup for this to:
					// - rate limit
					// - manage timeouts
					// - ensure two 'findprovs' calls for the same block don't run concurrently
					// - share peers between sessions based on interest set
					for p := range s.bs.network.FindProvidersAsync(ctx, k, 10) {
Jeromy's avatar
Jeromy committed
229 230
						newpeers <- p
					}
Jeromy's avatar
Jeromy committed
231
				}(live[0])
Jeromy's avatar
Jeromy committed
232 233 234 235
			}
			s.resetTick()
		case p := <-newpeers:
			s.addActivePeer(p)
Jeromy's avatar
Jeromy committed
236
		case lwchk := <-s.interestReqs:
237
			lwchk.resp <- s.cidIsWanted(lwchk.c)
Jeromy's avatar
Jeromy committed
238
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
239
			s.tick.Stop()
Jeromy's avatar
Jeromy committed
240
			s.bs.removeSession(s)
241 242 243 244 245

			cmgr := s.bs.network.ConnectionManager()
			for _, p := range s.activePeersArr {
				cmgr.UntagPeer(p, s.tag)
			}
Jeromy's avatar
Jeromy committed
246 247 248 249 250
			return
		}
	}
}

251
func (s *Session) cidIsWanted(c cid.Cid) bool {
Steven Allen's avatar
Steven Allen committed
252
	_, ok := s.liveWants[c]
253 254 255 256 257 258 259
	if !ok {
		ok = s.tofetch.Has(c)
	}

	return ok
}

Jeromy's avatar
Jeromy committed
260
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
261 262
	c := blk.Cid()
	if s.cidIsWanted(c) {
Steven Allen's avatar
Steven Allen committed
263
		tval, ok := s.liveWants[c]
264 265
		if ok {
			s.latTotal += time.Since(tval)
Steven Allen's avatar
Steven Allen committed
266
			delete(s.liveWants, c)
267 268 269
		} else {
			s.tofetch.Remove(c)
		}
Jeromy's avatar
Jeromy committed
270 271 272
		s.fetchcnt++
		s.notif.Publish(blk)

273 274
		if next := s.tofetch.Pop(); next.Defined() {
			s.wantBlocks(ctx, []cid.Cid{next})
Jeromy's avatar
Jeromy committed
275 276 277 278
		}
	}
}

279
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
280
	now := time.Now()
Jeromy's avatar
Jeromy committed
281
	for _, c := range ks {
Steven Allen's avatar
Steven Allen committed
282
		s.liveWants[c] = now
Jeromy's avatar
Jeromy committed
283
	}
Jeromy's avatar
Jeromy committed
284
	s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
Jeromy's avatar
Jeromy committed
285 286
}

287
func (s *Session) cancel(keys []cid.Cid) {
Jeromy's avatar
Jeromy committed
288
	for _, c := range keys {
289
		s.tofetch.Remove(c)
Jeromy's avatar
Jeromy committed
290 291 292
	}
}

293
func (s *Session) cancelWants(keys []cid.Cid) {
294 295 296 297
	select {
	case s.cancelKeys <- keys:
	case <-s.ctx.Done():
	}
Jeromy's avatar
Jeromy committed
298 299
}

300
func (s *Session) fetch(ctx context.Context, keys []cid.Cid) {
Jeromy's avatar
Jeromy committed
301 302 303
	select {
	case s.newReqs <- keys:
	case <-ctx.Done():
304
	case <-s.ctx.Done():
Jeromy's avatar
Jeromy committed
305 306 307
	}
}

Jeromy's avatar
Jeromy committed
308 309 310
// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
311
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
Jeromy's avatar
Jeromy committed
312 313 314 315
	ctx = logging.ContextWithLoggable(ctx, s.uuid)
	return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants)
}

Jeromy's avatar
Jeromy committed
316
// GetBlock fetches a single block
317
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
Jeromy's avatar
Jeromy committed
318 319
	return getBlock(parent, k, s.GetBlocks)
}
320 321

type cidQueue struct {
322
	elems []cid.Cid
323 324 325 326 327 328 329
	eset  *cid.Set
}

func newCidQueue() *cidQueue {
	return &cidQueue{eset: cid.NewSet()}
}

330
func (cq *cidQueue) Pop() cid.Cid {
331 332
	for {
		if len(cq.elems) == 0 {
333
			return cid.Cid{}
334 335 336 337 338 339 340 341 342 343 344 345
		}

		out := cq.elems[0]
		cq.elems = cq.elems[1:]

		if cq.eset.Has(out) {
			cq.eset.Remove(out)
			return out
		}
	}
}

346
func (cq *cidQueue) Push(c cid.Cid) {
347 348 349 350 351
	if cq.eset.Visit(c) {
		cq.elems = append(cq.elems, c)
	}
}

352
func (cq *cidQueue) Remove(c cid.Cid) {
353 354 355
	cq.eset.Remove(c)
}

356
func (cq *cidQueue) Has(c cid.Cid) bool {
357 358 359 360 361 362
	return cq.eset.Has(c)
}

func (cq *cidQueue) Len() int {
	return cq.eset.Len()
}