Commit 56219bd2 authored by dirkmc's avatar dirkmc Committed by Dirk McCormick

refactor: use locks for session want management

parent 38dcf8c3
......@@ -294,7 +294,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
// Split blocks into wanted blocks vs duplicates
wanted = make([]blocks.Block, 0, len(blks))
for _, b := range blks {
if bs.sm.InterestedIn(b.Cid()) {
if bs.sm.IsWanted(b.Cid()) {
wanted = append(wanted, b)
} else {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
......
......@@ -3,9 +3,9 @@ package session
import (
"context"
"math/rand"
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
bsgetter "github.com/ipfs/go-bitswap/getter"
notifications "github.com/ipfs/go-bitswap/notifications"
bssd "github.com/ipfs/go-bitswap/sessiondata"
......@@ -47,16 +47,18 @@ type RequestSplitter interface {
RecordUniqueBlock()
}
type interestReq struct {
c cid.Cid
resp chan bool
}
type rcvFrom struct {
from peer.ID
ks []cid.Cid
}
type sessionWants struct {
sync.RWMutex
toFetch *cidQueue
liveWants map[cid.Cid]time.Time
pastWants *cid.Set
}
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
// info to, and who to request blocks from.
......@@ -67,19 +69,16 @@ type Session struct {
pm PeerManager
srs RequestSplitter
sw sessionWants
// channels
incoming chan rcvFrom
newReqs chan []cid.Cid
cancelKeys chan []cid.Cid
interestReqs chan interestReq
latencyReqs chan chan time.Duration
tickDelayReqs chan time.Duration
// do not touch outside run loop
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
idleTick *time.Timer
periodicSearchTimer *time.Timer
baseTickDelay time.Duration
......@@ -105,12 +104,13 @@ func New(ctx context.Context,
initialSearchDelay time.Duration,
periodicSearchDelay delay.D) *Session {
s := &Session{
liveWants: make(map[cid.Cid]time.Time),
sw: sessionWants{
toFetch: newCidQueue(),
liveWants: make(map[cid.Cid]time.Time),
pastWants: cid.NewSet(),
},
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
......@@ -126,9 +126,6 @@ func New(ctx context.Context,
periodicSearchDelay: periodicSearchDelay,
}
cache, _ := lru.New(2048)
s.interest = cache
go s.run(ctx)
return s
......@@ -142,34 +139,20 @@ func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) {
}
}
// InterestedIn returns true if this session is interested in the given Cid.
// IsWanted returns true if this session is waiting to receive the given Cid.
func (s *Session) IsWanted(c cid.Cid) bool {
s.sw.RLock()
defer s.sw.RUnlock()
return s.unlockedIsWanted(c)
}
// InterestedIn returns true if this session has ever requested the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
if s.interest.Contains(c) {
return true
}
// TODO: PERF: this is using a channel to guard a map access against race
// conditions. This is definitely much slower than a mutex, though its unclear
// if it will actually induce any noticeable slowness. This is implemented this
// way to avoid adding a more complex set of mutexes around the liveWants map.
// note that in the average case (where this session *is* interested in the
// block we received) this function will not be called, as the cid will likely
// still be in the interest cache.
resp := make(chan bool, 1)
select {
case s.interestReqs <- interestReq{
c: c,
resp: resp,
}:
case <-s.ctx.Done():
return false
}
s.sw.RLock()
defer s.sw.RUnlock()
select {
case want := <-resp:
return want
case <-s.ctx.Done():
return false
}
return s.unlockedIsWanted(c) || s.sw.pastWants.Has(c)
}
// GetBlock fetches a single block.
......@@ -233,23 +216,15 @@ func (s *Session) run(ctx context.Context) {
for {
select {
case rcv := <-s.incoming:
s.cancelIncoming(ctx, rcv)
// Record statistics only if the blocks came from the network
// (blocks can also be received from the local node)
if rcv.from != "" {
s.updateReceiveCounters(ctx, rcv)
}
s.handleIncoming(ctx, rcv)
case keys := <-s.newReqs:
s.handleNewRequest(ctx, keys)
s.wantBlocks(ctx, keys)
case keys := <-s.cancelKeys:
s.handleCancel(keys)
case <-s.idleTick.C:
s.handleIdleTick(ctx)
case <-s.periodicSearchTimer.C:
s.handlePeriodicSearch(ctx)
case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c)
case resp := <-s.latencyReqs:
resp <- s.averageLatency()
case baseTickDelay := <-s.tickDelayReqs:
......@@ -261,59 +236,17 @@ func (s *Session) run(ctx context.Context) {
}
}
func (s *Session) cancelIncoming(ctx context.Context, rcv rcvFrom) {
// We've received the blocks so we can cancel any outstanding wants for them
wanted := make([]cid.Cid, 0, len(rcv.ks))
for _, k := range rcv.ks {
if s.cidIsWanted(k) {
wanted = append(wanted, k)
}
}
s.pm.RecordCancels(wanted)
s.wm.CancelWants(s.ctx, wanted, nil, s.id)
}
func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
s.idleTick.Stop()
// Process the received blocks
s.processIncoming(ctx, rcv.ks)
s.resetIdleTick()
}
func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
for _, k := range keys {
s.interest.Add(k, nil)
}
if toadd := s.wantBudget(); toadd > 0 {
if toadd > len(keys) {
toadd = len(keys)
}
now := keys[:toadd]
keys = keys[toadd:]
func (s *Session) handleCancel(keys []cid.Cid) {
s.sw.Lock()
defer s.sw.Unlock()
s.wantBlocks(ctx, now)
}
for _, k := range keys {
s.tofetch.Push(k)
}
}
func (s *Session) handleCancel(keys []cid.Cid) {
for _, c := range keys {
s.tofetch.Remove(c)
s.sw.toFetch.Remove(k)
}
}
func (s *Session) handleIdleTick(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
live = append(live, c)
s.liveWants[c] = now
}
live := s.prepareBroadcast()
// Broadcast these keys to everyone we're connected to
s.pm.RecordPeerRequests(nil, live)
......@@ -326,11 +259,27 @@ func (s *Session) handleIdleTick(ctx context.Context) {
}
s.resetIdleTick()
if len(s.liveWants) > 0 {
s.sw.RLock()
defer s.sw.RUnlock()
if len(s.sw.liveWants) > 0 {
s.consecutiveTicks++
}
}
func (s *Session) prepareBroadcast() []cid.Cid {
s.sw.Lock()
defer s.sw.Unlock()
live := make([]cid.Cid, 0, len(s.sw.liveWants))
now := time.Now()
for c := range s.sw.liveWants {
live = append(live, c)
s.sw.liveWants[c] = now
}
return live
}
func (s *Session) handlePeriodicSearch(ctx context.Context) {
randomWant := s.randomLiveWant()
if !randomWant.Defined() {
......@@ -346,12 +295,15 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) {
}
func (s *Session) randomLiveWant() cid.Cid {
if len(s.liveWants) == 0 {
s.sw.RLock()
defer s.sw.RUnlock()
if len(s.sw.liveWants) == 0 {
return cid.Cid{}
}
i := rand.Intn(len(s.liveWants))
i := rand.Intn(len(s.sw.liveWants))
// picking a random live want
for k := range s.liveWants {
for k := range s.sw.liveWants {
if i == 0 {
return k
}
......@@ -359,83 +311,127 @@ func (s *Session) randomLiveWant() cid.Cid {
}
return cid.Cid{}
}
func (s *Session) handleShutdown() {
s.idleTick.Stop()
live := make([]cid.Cid, 0, len(s.liveWants))
for c := range s.liveWants {
live := s.liveWants()
s.wm.CancelWants(s.ctx, live, nil, s.id)
}
func (s *Session) liveWants() []cid.Cid {
s.sw.RLock()
defer s.sw.RUnlock()
live := make([]cid.Cid, 0, len(s.sw.liveWants))
for c := range s.sw.liveWants {
live = append(live, c)
}
s.wm.CancelWants(s.ctx, live, nil, s.id)
return live
}
func (s *Session) cidIsWanted(c cid.Cid) bool {
_, ok := s.liveWants[c]
func (s *Session) unlockedIsWanted(c cid.Cid) bool {
_, ok := s.sw.liveWants[c]
if !ok {
ok = s.tofetch.Has(c)
ok = s.sw.toFetch.Has(c)
}
return ok
}
func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid) {
for _, c := range ks {
if s.cidIsWanted(c) {
// If the block CID was in the live wants queue, remove it
tval, ok := s.liveWants[c]
if ok {
s.latTotal += time.Since(tval)
delete(s.liveWants, c)
} else {
// Otherwise remove it from the tofetch queue, if it was there
s.tofetch.Remove(c)
}
s.fetchcnt++
// We've received new wanted blocks, so reset the number of ticks
// that have occurred since the last new block
s.consecutiveTicks = 0
// Keep track of CIDs we've successfully fetched
s.pastWants.Push(c)
}
func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
// Record statistics only if the blocks came from the network
// (blocks can also be received from the local node)
if rcv.from != "" {
s.updateReceiveCounters(ctx, rcv)
}
// Transfer as many CIDs as possible from the tofetch queue into the
// live wants queue
toAdd := s.wantBudget()
if toAdd > s.tofetch.Len() {
toAdd = s.tofetch.Len()
}
if toAdd > 0 {
var keys []cid.Cid
for i := 0; i < toAdd; i++ {
keys = append(keys, s.tofetch.Pop())
}
s.wantBlocks(ctx, keys)
// Update the want list
wanted, totalLatency := s.blocksReceived(rcv.ks)
if len(wanted) == 0 {
return
}
// We've received the blocks so we can cancel any outstanding wants for them
s.cancelIncoming(ctx, wanted)
s.idleTick.Stop()
// Process the received blocks
s.processIncoming(ctx, wanted, totalLatency)
s.resetIdleTick()
}
func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
for _, k := range rcv.ks {
// Inform the request splitter of unique / duplicate blocks
if s.cidIsWanted(k) {
s.sw.RLock()
for _, c := range rcv.ks {
if s.unlockedIsWanted(c) {
s.srs.RecordUniqueBlock()
} else if s.pastWants.Has(k) {
} else if s.sw.pastWants.Has(c) {
s.srs.RecordDuplicateBlock()
}
}
s.sw.RUnlock()
// Record response (to be able to time latency)
if len(rcv.ks) > 0 {
s.pm.RecordPeerResponse(rcv.from, rcv.ks)
}
}
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
now := time.Now()
for _, c := range ks {
s.liveWants[c] = now
func (s *Session) blocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration) {
s.sw.Lock()
defer s.sw.Unlock()
totalLatency := time.Duration(0)
wanted := make([]cid.Cid, 0, len(cids))
for _, c := range cids {
if s.unlockedIsWanted(c) {
wanted = append(wanted, c)
// If the block CID was in the live wants queue, remove it
tval, ok := s.sw.liveWants[c]
if ok {
totalLatency += time.Since(tval)
delete(s.sw.liveWants, c)
} else {
// Otherwise remove it from the toFetch queue, if it was there
s.sw.toFetch.Remove(c)
}
// Keep track of CIDs we've successfully fetched
s.sw.pastWants.Add(c)
}
}
return wanted, totalLatency
}
func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) {
s.pm.RecordCancels(ks)
s.wm.CancelWants(s.ctx, ks, nil, s.id)
}
func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid, totalLatency time.Duration) {
// Keep track of the total number of blocks received and total latency
s.fetchcnt += len(ks)
s.latTotal += totalLatency
// We've received new wanted blocks, so reset the number of ticks
// that have occurred since the last new block
s.consecutiveTicks = 0
s.wantBlocks(ctx, nil)
}
func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
ks := s.getNextWants(s.wantLimit(), newks)
if len(ks) == 0 {
return
}
peers := s.pm.GetOptimizedPeers()
if len(peers) > 0 {
splitRequests := s.srs.SplitRequest(peers, ks)
......@@ -449,6 +445,29 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
}
}
func (s *Session) getNextWants(limit int, newWants []cid.Cid) []cid.Cid {
s.sw.Lock()
defer s.sw.Unlock()
now := time.Now()
for _, k := range newWants {
s.sw.toFetch.Push(k)
}
currentLiveCount := len(s.sw.liveWants)
toAdd := limit - currentLiveCount
var live []cid.Cid
for ; toAdd > 0 && s.sw.toFetch.Len() > 0; toAdd-- {
c := s.sw.toFetch.Pop()
live = append(live, c)
s.sw.liveWants[c] = now
}
return live
}
func (s *Session) averageLatency() time.Duration {
return s.latTotal / time.Duration(s.fetchcnt)
}
......@@ -465,16 +484,9 @@ func (s *Session) resetIdleTick() {
s.idleTick.Reset(tickDelay)
}
func (s *Session) wantBudget() int {
live := len(s.liveWants)
var budget int
func (s *Session) wantLimit() int {
if len(s.pm.GetOptimizedPeers()) > 0 {
budget = targetedLiveWantsLimit - live
} else {
budget = broadcastLiveWantsLimit - live
}
if budget < 0 {
budget = 0
return targetedLiveWantsLimit
}
return budget
return broadcastLiveWantsLimit
}
......@@ -118,6 +118,14 @@ func TestSessionGetBlocks(t *testing.T) {
if receivedWantReq.peers != nil {
t.Fatal("first want request should be a broadcast")
}
for _, c := range cids {
if !session.IsWanted(c) {
t.Fatal("expected session to want cids")
}
if !session.InterestedIn(c) {
t.Fatal("expected session to be interested in cids")
}
}
// now receive the first set of blocks
peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
......@@ -211,6 +219,14 @@ func TestSessionGetBlocks(t *testing.T) {
t.Fatal("received incorrect block")
}
}
for _, c := range cids {
if session.IsWanted(c) {
t.Fatal("expected session NOT to want cids")
}
if !session.InterestedIn(c) {
t.Fatal("expected session to still be interested in cids")
}
}
}
func TestSessionFindMorePeers(t *testing.T) {
......
......@@ -19,6 +19,7 @@ type Session interface {
exchange.Fetcher
InterestedIn(cid.Cid) bool
ReceiveFrom(peer.ID, []cid.Cid)
IsWanted(cid.Cid) bool
}
type sesTrk struct {
......@@ -132,14 +133,14 @@ func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) {
}
}
// InterestedIn indicates whether any of the sessions are waiting to receive
// IsWanted indicates whether any of the sessions are waiting to receive
// the block with the given CID.
func (sm *SessionManager) InterestedIn(cid cid.Cid) bool {
func (sm *SessionManager) IsWanted(cid cid.Cid) bool {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for _, s := range sm.sessions {
if s.session.InterestedIn(cid) {
if s.session.IsWanted(cid) {
return true
}
}
......
......@@ -40,6 +40,9 @@ func (fs *fakeSession) InterestedIn(c cid.Cid) bool {
}
return false
}
func (fs *fakeSession) IsWanted(c cid.Cid) bool {
return fs.InterestedIn(c)
}
func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid) {
fs.ks = append(fs.ks, ks...)
}
......@@ -195,12 +198,12 @@ func TestInterestedIn(t *testing.T) {
nextInterestedIn = []cid.Cid{cids[0], cids[2]}
_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
if !sm.InterestedIn(cids[0]) ||
!sm.InterestedIn(cids[1]) ||
!sm.InterestedIn(cids[2]) {
if !sm.IsWanted(cids[0]) ||
!sm.IsWanted(cids[1]) ||
!sm.IsWanted(cids[2]) {
t.Fatal("expected interest but session manager was not interested")
}
if sm.InterestedIn(cids[3]) {
if sm.IsWanted(cids[3]) {
t.Fatal("expected no interest but session manager was interested")
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment