Commit 2ea8ba82 authored by hannahhoward's avatar hannahhoward

feat(sessions): reduce duplicates

Reduce duplicates through splits of requests
parent 4951001b
...@@ -391,7 +391,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -391,7 +391,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
defer wg.Done() defer wg.Done()
bs.updateReceiveCounters(b) bs.updateReceiveCounters(b)
bs.sm.UpdateReceiveCounters(b)
log.Debugf("got block %s from %s", b, p) log.Debugf("got block %s from %s", b, p)
// skip received blocks that are not in the wantlist // skip received blocks that are not in the wantlist
......
...@@ -2,6 +2,7 @@ package session ...@@ -2,6 +2,7 @@ package session
import ( import (
"context" "context"
"math/rand"
"time" "time"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
...@@ -14,7 +15,15 @@ import ( ...@@ -14,7 +15,15 @@ import (
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
) )
const activeWantsLimit = 16 const (
minReceivedToSplit = 2
maxSplit = 32
maxAcceptableDupes = 0.4
minDuplesToTryLessSplits = 0.2
initialSplit = 2
broadcastLiveWantsLimit = 4
targetedLiveWantsLimit = 32
)
// WantManager is an interface that can be used to request blocks // WantManager is an interface that can be used to request blocks
// from given peers. // from given peers.
...@@ -38,8 +47,9 @@ type interestReq struct { ...@@ -38,8 +47,9 @@ type interestReq struct {
} }
type blkRecv struct { type blkRecv struct {
from peer.ID from peer.ID
blk blocks.Block blk blocks.Block
counterMessage bool
} }
// Session holds state for an individual bitswap transfer operation. // Session holds state for an individual bitswap transfer operation.
...@@ -60,14 +70,17 @@ type Session struct { ...@@ -60,14 +70,17 @@ type Session struct {
tickDelayReqs chan time.Duration tickDelayReqs chan time.Duration
// do not touch outside run loop // do not touch outside run loop
tofetch *cidQueue tofetch *cidQueue
interest *lru.Cache interest *lru.Cache
liveWants map[cid.Cid]time.Time pastWants *cidQueue
tick *time.Timer liveWants map[cid.Cid]time.Time
baseTickDelay time.Duration tick *time.Timer
latTotal time.Duration baseTickDelay time.Duration
fetchcnt int latTotal time.Duration
fetchcnt int
receivedCount int
split int
duplicateReceivedCount int
// identifiers // identifiers
notif notifications.PubSub notif notifications.PubSub
uuid logging.Loggable uuid logging.Loggable
...@@ -82,12 +95,14 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio ...@@ -82,12 +95,14 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio
newReqs: make(chan []cid.Cid), newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid), cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(), tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq), interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration), latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration), tickDelayReqs: make(chan time.Duration),
ctx: ctx, ctx: ctx,
wm: wm, wm: wm,
pm: pm, pm: pm,
split: initialSplit,
incoming: make(chan blkRecv), incoming: make(chan blkRecv),
notif: notifications.New(), notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"), uuid: loggables.Uuid("GetBlockRequest"),
...@@ -106,7 +121,7 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio ...@@ -106,7 +121,7 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio
// ReceiveBlockFrom receives an incoming block from the given peer. // ReceiveBlockFrom receives an incoming block from the given peer.
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) { func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
select { select {
case s.incoming <- blkRecv{from: from, blk: blk}: case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: false}:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
ks := []cid.Cid{blk.Cid()} ks := []cid.Cid{blk.Cid()}
...@@ -114,6 +129,15 @@ func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) { ...@@ -114,6 +129,15 @@ func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
} }
// UpdateReceiveCounters updates receive counters for a block,
// which may be a duplicate and adjusts the split factor based on that.
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
select {
case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
case <-s.ctx.Done():
}
}
// InterestedIn returns true if this session is interested in the given Cid. // InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool { func (s *Session) InterestedIn(c cid.Cid) bool {
if s.interest.Contains(c) { if s.interest.Contains(c) {
...@@ -205,7 +229,11 @@ func (s *Session) run(ctx context.Context) { ...@@ -205,7 +229,11 @@ func (s *Session) run(ctx context.Context) {
for { for {
select { select {
case blk := <-s.incoming: case blk := <-s.incoming:
s.handleIncomingBlock(ctx, blk) if blk.counterMessage {
s.updateReceiveCounters(ctx, blk.blk)
} else {
s.handleIncomingBlock(ctx, blk)
}
case keys := <-s.newReqs: case keys := <-s.newReqs:
s.handleNewRequest(ctx, keys) s.handleNewRequest(ctx, keys)
case keys := <-s.cancelKeys: case keys := <-s.cancelKeys:
...@@ -241,8 +269,7 @@ func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) { ...@@ -241,8 +269,7 @@ func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
for _, k := range keys { for _, k := range keys {
s.interest.Add(k, nil) s.interest.Add(k, nil)
} }
if len(s.liveWants) < activeWantsLimit { if toadd := s.wantBudget(); toadd > 0 {
toadd := activeWantsLimit - len(s.liveWants)
if toadd > len(keys) { if toadd > len(keys) {
toadd = len(keys) toadd = len(keys)
} }
...@@ -264,6 +291,7 @@ func (s *Session) handleCancel(keys []cid.Cid) { ...@@ -264,6 +291,7 @@ func (s *Session) handleCancel(keys []cid.Cid) {
} }
func (s *Session) handleTick(ctx context.Context) { func (s *Session) handleTick(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants)) live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now() now := time.Now()
for c := range s.liveWants { for c := range s.liveWants {
...@@ -316,6 +344,28 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { ...@@ -316,6 +344,28 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
if next := s.tofetch.Pop(); next.Defined() { if next := s.tofetch.Pop(); next.Defined() {
s.wantBlocks(ctx, []cid.Cid{next}) s.wantBlocks(ctx, []cid.Cid{next})
} }
s.pastWants.Push(c)
}
}
func (s *Session) duplicateRatio() float64 {
return float64(s.duplicateReceivedCount) / float64(s.receivedCount)
}
func (s *Session) updateReceiveCounters(ctx context.Context, blk blocks.Block) {
if s.pastWants.Has(blk.Cid()) {
s.receivedCount++
s.duplicateReceivedCount++
if (s.receivedCount > minReceivedToSplit) && (s.duplicateRatio() > maxAcceptableDupes) && (s.split < maxSplit) {
s.split++
}
} else {
if s.cidIsWanted(blk.Cid()) {
s.receivedCount++
if (s.split > 1) && (s.duplicateRatio() < minDuplesToTryLessSplits) {
s.split--
}
}
} }
} }
...@@ -325,9 +375,18 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) { ...@@ -325,9 +375,18 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
s.liveWants[c] = now s.liveWants[c] = now
} }
peers := s.pm.GetOptimizedPeers() peers := s.pm.GetOptimizedPeers()
// right now we're requesting each block from every peer, but soon, maybe not if len(peers) > 0 {
s.pm.RecordPeerRequests(peers, ks) splitRequests := split(ks, peers, s.split)
s.wm.WantBlocks(ctx, ks, peers, s.id) for i, currentKeys := range splitRequests.ks {
currentPeers := splitRequests.peers[i]
// right now we're requesting each block from every peer, but soon, maybe not
s.pm.RecordPeerRequests(currentPeers, currentKeys)
s.wm.WantBlocks(ctx, currentKeys, currentPeers, s.id)
}
} else {
s.pm.RecordPeerRequests(nil, ks)
s.wm.WantBlocks(ctx, ks, nil, s.id)
}
} }
func (s *Session) averageLatency() time.Duration { func (s *Session) averageLatency() time.Duration {
...@@ -342,3 +401,50 @@ func (s *Session) resetTick() { ...@@ -342,3 +401,50 @@ func (s *Session) resetTick() {
s.tick.Reset(s.baseTickDelay + (3 * avLat)) s.tick.Reset(s.baseTickDelay + (3 * avLat))
} }
} }
type splitRec struct {
ks [][]cid.Cid
peers [][]peer.ID
}
func split(ks []cid.Cid, peers []peer.ID, split int) *splitRec {
peerSplit := split
if len(peers) < peerSplit {
peerSplit = len(peers)
}
keySplit := split
if len(ks) < keySplit {
keySplit = len(ks)
}
if keySplit > peerSplit {
keySplit = peerSplit
}
out := &splitRec{
ks: make([][]cid.Cid, keySplit),
peers: make([][]peer.ID, peerSplit),
}
for i, c := range ks {
pos := i % keySplit
out.ks[pos] = append(out.ks[pos], c)
}
peerOrder := rand.Perm(len(peers))
for i, po := range peerOrder {
pos := i % peerSplit
out.peers[pos] = append(out.peers[pos], peers[po])
}
return out
}
func (s *Session) wantBudget() int {
live := len(s.liveWants)
var budget int
if len(s.pm.GetOptimizedPeers()) > 0 {
budget = targetedLiveWantsLimit - live
} else {
budget = broadcastLiveWantsLimit - live
}
if budget < 0 {
budget = 0
}
return budget
}
...@@ -65,7 +65,7 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -65,7 +65,7 @@ func TestSessionGetBlocks(t *testing.T) {
id := testutil.GenerateSessionID() id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm) session := New(ctx, id, fwm, fpm)
blockGenerator := blocksutil.NewBlockGenerator() blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(activeWantsLimit * 2) blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid var cids []cid.Cid
for _, block := range blks { for _, block := range blks {
cids = append(cids, block.Cid()) cids = append(cids, block.Cid())
...@@ -79,7 +79,7 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -79,7 +79,7 @@ func TestSessionGetBlocks(t *testing.T) {
// check initial want request // check initial want request
receivedWantReq := <-fwm.wantReqs receivedWantReq := <-fwm.wantReqs
if len(receivedWantReq.cids) != activeWantsLimit { if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
t.Fatal("did not enqueue correct initial number of wants") t.Fatal("did not enqueue correct initial number of wants")
} }
if receivedWantReq.peers != nil { if receivedWantReq.peers != nil {
...@@ -87,7 +87,7 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -87,7 +87,7 @@ func TestSessionGetBlocks(t *testing.T) {
} }
// now receive the first set of blocks // now receive the first set of blocks
peers := testutil.GeneratePeers(activeWantsLimit) peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
var newCancelReqs []wantReq var newCancelReqs []wantReq
var newBlockReqs []wantReq var newBlockReqs []wantReq
var receivedBlocks []blocks.Block var receivedBlocks []blocks.Block
...@@ -103,7 +103,7 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -103,7 +103,7 @@ func TestSessionGetBlocks(t *testing.T) {
// verify new peers were recorded // verify new peers were recorded
fpm.lk.Lock() fpm.lk.Lock()
if len(fpm.peers) != activeWantsLimit { if len(fpm.peers) != broadcastLiveWantsLimit {
t.Fatal("received blocks not recorded by the peer manager") t.Fatal("received blocks not recorded by the peer manager")
} }
for _, p := range fpm.peers { for _, p := range fpm.peers {
...@@ -116,7 +116,7 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -116,7 +116,7 @@ func TestSessionGetBlocks(t *testing.T) {
// look at new interactions with want manager // look at new interactions with want manager
// should have cancelled each received block // should have cancelled each received block
if len(newCancelReqs) != activeWantsLimit { if len(newCancelReqs) != broadcastLiveWantsLimit {
t.Fatal("did not cancel each block once it was received") t.Fatal("did not cancel each block once it was received")
} }
// new session reqs should be targeted // new session reqs should be targeted
...@@ -129,7 +129,7 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -129,7 +129,7 @@ func TestSessionGetBlocks(t *testing.T) {
} }
// full new round of cids should be requested // full new round of cids should be requested
if totalEnqueued != activeWantsLimit { if totalEnqueued != broadcastLiveWantsLimit {
t.Fatal("new blocks were not requested") t.Fatal("new blocks were not requested")
} }
...@@ -164,7 +164,7 @@ func TestSessionFindMorePeers(t *testing.T) { ...@@ -164,7 +164,7 @@ func TestSessionFindMorePeers(t *testing.T) {
session := New(ctx, id, fwm, fpm) session := New(ctx, id, fwm, fpm)
session.SetBaseTickDelay(200 * time.Microsecond) session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator() blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(activeWantsLimit * 2) blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid var cids []cid.Cid
for _, block := range blks { for _, block := range blks {
cids = append(cids, block.Cid()) cids = append(cids, block.Cid())
...@@ -190,7 +190,7 @@ func TestSessionFindMorePeers(t *testing.T) { ...@@ -190,7 +190,7 @@ func TestSessionFindMorePeers(t *testing.T) {
// verify a broadcast was made // verify a broadcast was made
receivedWantReq := <-wantReqs receivedWantReq := <-wantReqs
if len(receivedWantReq.cids) != activeWantsLimit { if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list") t.Fatal("did not rebroadcast whole live list")
} }
if receivedWantReq.peers != nil { if receivedWantReq.peers != nil {
......
...@@ -17,6 +17,7 @@ type Session interface { ...@@ -17,6 +17,7 @@ type Session interface {
exchange.Fetcher exchange.Fetcher
InterestedIn(cid.Cid) bool InterestedIn(cid.Cid) bool
ReceiveBlockFrom(peer.ID, blocks.Block) ReceiveBlockFrom(peer.ID, blocks.Block)
UpdateReceiveCounters(blocks.Block)
} }
type sesTrk struct { type sesTrk struct {
...@@ -112,3 +113,14 @@ func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) { ...@@ -112,3 +113,14 @@ func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
} }
} }
} }
// UpdateReceiveCounters records the fact that a block was received, allowing
// sessions to track duplicates
func (sm *SessionManager) UpdateReceiveCounters(blk blocks.Block) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for _, s := range sm.sessions {
s.session.UpdateReceiveCounters(blk)
}
}
...@@ -13,10 +13,11 @@ import ( ...@@ -13,10 +13,11 @@ import (
) )
type fakeSession struct { type fakeSession struct {
interested bool interested bool
receivedBlock bool receivedBlock bool
id uint64 updateReceiveCounters bool
pm *fakePeerManager id uint64
pm *fakePeerManager
} }
func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) { func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
...@@ -27,6 +28,7 @@ func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, ...@@ -27,6 +28,7 @@ func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block,
} }
func (fs *fakeSession) InterestedIn(cid.Cid) bool { return fs.interested } func (fs *fakeSession) InterestedIn(cid.Cid) bool { return fs.interested }
func (fs *fakeSession) ReceiveBlockFrom(peer.ID, blocks.Block) { fs.receivedBlock = true } func (fs *fakeSession) ReceiveBlockFrom(peer.ID, blocks.Block) { fs.receivedBlock = true }
func (fs *fakeSession) UpdateReceiveCounters(blocks.Block) { fs.updateReceiveCounters = true }
type fakePeerManager struct { type fakePeerManager struct {
id uint64 id uint64
......
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
) )
const ( const (
maxOptimizedPeers = 25 maxOptimizedPeers = 32
reservePeers = 2 reservePeers = 2
) )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment