Commit 49a96fbe authored by hannahhoward's avatar hannahhoward

feat(sessions): add rebroadcasting, search backoff

on a tick, do not keep searching for providers for the same block. instead rely on a periodic search
for more providers. (which will run no matter what, even w/o ticks, to optimize found providers).
also backoff tick time to reduce broadcasts.

fix #95, fix #107
parent 10e93ab6
...@@ -2,6 +2,8 @@ package session ...@@ -2,6 +2,8 @@ package session
import ( import (
"context" "context"
"fmt"
"math/rand"
"time" "time"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
...@@ -9,6 +11,7 @@ import ( ...@@ -9,6 +11,7 @@ import (
notifications "github.com/ipfs/go-bitswap/notifications" notifications "github.com/ipfs/go-bitswap/notifications"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
loggables "github.com/libp2p/go-libp2p-loggables" loggables "github.com/libp2p/go-libp2p-loggables"
...@@ -75,14 +78,17 @@ type Session struct { ...@@ -75,14 +78,17 @@ type Session struct {
tickDelayReqs chan time.Duration tickDelayReqs chan time.Duration
// do not touch outside run loop // do not touch outside run loop
tofetch *cidQueue tofetch *cidQueue
interest *lru.Cache interest *lru.Cache
pastWants *cidQueue pastWants *cidQueue
liveWants map[cid.Cid]time.Time liveWants map[cid.Cid]time.Time
tick *time.Timer tick *time.Timer
baseTickDelay time.Duration rebroadcast *time.Timer
latTotal time.Duration baseTickDelay time.Duration
fetchcnt int latTotal time.Duration
fetchcnt int
consecutiveTicks int
lastFetchCount int
// identifiers // identifiers
notif notifications.PubSub notif notifications.PubSub
uuid logging.Loggable uuid logging.Loggable
...@@ -93,23 +99,24 @@ type Session struct { ...@@ -93,23 +99,24 @@ type Session struct {
// given context. // given context.
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session { func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
s := &Session{ s := &Session{
liveWants: make(map[cid.Cid]time.Time), liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid), newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid), cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(), tofetch: newCidQueue(),
pastWants: newCidQueue(), pastWants: newCidQueue(),
interestReqs: make(chan interestReq), interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration), latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration), tickDelayReqs: make(chan time.Duration),
ctx: ctx, ctx: ctx,
wm: wm, wm: wm,
pm: pm, pm: pm,
srs: srs, srs: srs,
incoming: make(chan blkRecv), incoming: make(chan blkRecv),
notif: notifications.New(), notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"), uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500, baseTickDelay: time.Millisecond * 500,
id: id, lastFetchCount: -1,
id: id,
} }
cache, _ := lru.New(2048) cache, _ := lru.New(2048)
...@@ -223,16 +230,23 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) { ...@@ -223,16 +230,23 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
} }
var provSearchDelay = time.Second var provSearchDelay = time.Second
var rebroadcastDelay = delay.Fixed(time.Minute)
// SetProviderSearchDelay overwrites the global provider search delay // SetProviderSearchDelay overwrites the global provider search delay
func SetProviderSearchDelay(newProvSearchDelay time.Duration) { func SetProviderSearchDelay(newProvSearchDelay time.Duration) {
provSearchDelay = newProvSearchDelay provSearchDelay = newProvSearchDelay
} }
// SetRebroadcastDelay overwrites the global provider rebroadcast delay
func SetRebroadcastDelay(newRebroadcastDelay delay.D) {
rebroadcastDelay = newRebroadcastDelay
}
// Session run loop -- everything function below here should not be called // Session run loop -- everything function below here should not be called
// of this loop // of this loop
func (s *Session) run(ctx context.Context) { func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(provSearchDelay) s.tick = time.NewTimer(provSearchDelay)
s.rebroadcast = time.NewTimer(rebroadcastDelay.Get())
for { for {
select { select {
case blk := <-s.incoming: case blk := <-s.incoming:
...@@ -247,6 +261,8 @@ func (s *Session) run(ctx context.Context) { ...@@ -247,6 +261,8 @@ func (s *Session) run(ctx context.Context) {
s.handleCancel(keys) s.handleCancel(keys)
case <-s.tick.C: case <-s.tick.C:
s.handleTick(ctx) s.handleTick(ctx)
case <-s.rebroadcast.C:
s.handleRebroadcast(ctx)
case lwchk := <-s.interestReqs: case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c) lwchk.resp <- s.cidIsWanted(lwchk.c)
case resp := <-s.latencyReqs: case resp := <-s.latencyReqs:
...@@ -299,6 +315,12 @@ func (s *Session) handleCancel(keys []cid.Cid) { ...@@ -299,6 +315,12 @@ func (s *Session) handleCancel(keys []cid.Cid) {
func (s *Session) handleTick(ctx context.Context) { func (s *Session) handleTick(ctx context.Context) {
if s.fetchcnt == s.lastFetchCount {
s.consecutiveTicks++
} else {
s.lastFetchCount = s.fetchcnt
}
live := make([]cid.Cid, 0, len(s.liveWants)) live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now() now := time.Now()
for c := range s.liveWants { for c := range s.liveWants {
...@@ -310,12 +332,39 @@ func (s *Session) handleTick(ctx context.Context) { ...@@ -310,12 +332,39 @@ func (s *Session) handleTick(ctx context.Context) {
s.pm.RecordPeerRequests(nil, live) s.pm.RecordPeerRequests(nil, live)
s.wm.WantBlocks(ctx, live, nil, s.id) s.wm.WantBlocks(ctx, live, nil, s.id)
if len(live) > 0 { // do no find providers on consecutive ticks
// -- just rely on periodic rebroadcast
if len(live) > 0 && (s.consecutiveTicks == 0) {
s.pm.FindMorePeers(ctx, live[0]) s.pm.FindMorePeers(ctx, live[0])
} }
s.resetTick() s.resetTick()
} }
func (s *Session) handleRebroadcast(ctx context.Context) {
fmt.Println("Rebroadcast")
if len(s.liveWants) == 0 {
return
}
// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
s.pm.FindMorePeers(ctx, s.randomLiveWant())
s.rebroadcast.Reset(rebroadcastDelay.Get())
}
func (s *Session) randomLiveWant() cid.Cid {
i := rand.Intn(len(s.liveWants))
// picking a random live want
for k := range s.liveWants {
if i == 0 {
return k
}
i--
}
return cid.Cid{}
}
func (s *Session) handleShutdown() { func (s *Session) handleShutdown() {
s.tick.Stop() s.tick.Stop()
s.notif.Shutdown() s.notif.Shutdown()
...@@ -347,6 +396,8 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { ...@@ -347,6 +396,8 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
s.tofetch.Remove(c) s.tofetch.Remove(c)
} }
s.fetchcnt++ s.fetchcnt++
// we've received new wanted blocks, so future ticks are not consecutive
s.consecutiveTicks = 0
s.notif.Publish(blk) s.notif.Publish(blk)
toAdd := s.wantBudget() toAdd := s.wantBudget()
...@@ -395,12 +446,15 @@ func (s *Session) averageLatency() time.Duration { ...@@ -395,12 +446,15 @@ func (s *Session) averageLatency() time.Duration {
} }
func (s *Session) resetTick() { func (s *Session) resetTick() {
var tickDelay time.Duration
if s.latTotal == 0 { if s.latTotal == 0 {
s.tick.Reset(provSearchDelay) tickDelay = provSearchDelay
} else { } else {
avLat := s.averageLatency() avLat := s.averageLatency()
s.tick.Reset(s.baseTickDelay + (3 * avLat)) tickDelay = s.baseTickDelay + (3 * avLat)
} }
tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
s.tick.Reset(tickDelay)
} }
func (s *Session) wantBudget() int { func (s *Session) wantBudget() int {
......
...@@ -6,12 +6,12 @@ import ( ...@@ -6,12 +6,12 @@ import (
"testing" "testing"
"time" "time"
"github.com/ipfs/go-block-format"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter" bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
"github.com/ipfs/go-bitswap/testutil" "github.com/ipfs/go-bitswap/testutil"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil" blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
) )
...@@ -42,12 +42,12 @@ func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, pee ...@@ -42,12 +42,12 @@ func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, pee
type fakePeerManager struct { type fakePeerManager struct {
lk sync.RWMutex lk sync.RWMutex
peers []peer.ID peers []peer.ID
findMorePeersRequested chan struct{} findMorePeersRequested chan cid.Cid
} }
func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) { func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) {
select { select {
case fpm.findMorePeersRequested <- struct{}{}: case fpm.findMorePeersRequested <- k:
case <-ctx.Done(): case <-ctx.Done():
} }
} }
...@@ -193,7 +193,7 @@ func TestSessionFindMorePeers(t *testing.T) { ...@@ -193,7 +193,7 @@ func TestSessionFindMorePeers(t *testing.T) {
wantReqs := make(chan wantReq, 1) wantReqs := make(chan wantReq, 1)
cancelReqs := make(chan wantReq, 1) cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs} fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{}, 1)} fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)}
frs := &fakeRequestSplitter{} frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID() id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, frs) session := New(ctx, id, fwm, fpm, frs)
...@@ -258,3 +258,128 @@ func TestSessionFindMorePeers(t *testing.T) { ...@@ -258,3 +258,128 @@ func TestSessionFindMorePeers(t *testing.T) {
t.Fatal("Did not find more peers") t.Fatal("Did not find more peers")
} }
} }
func TestSessionFailingToGetFirstBlock(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
defer cancel()
wantReqs := make(chan wantReq, 1)
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
SetProviderSearchDelay(10 * time.Millisecond)
defer SetProviderSearchDelay(1 * time.Second)
SetRebroadcastDelay(delay.Fixed(100 * time.Millisecond))
defer SetRebroadcastDelay(delay.Fixed(1 * time.Minute))
session := New(ctx, id, fwm, fpm, frs)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(4)
var cids []cid.Cid
for _, block := range blks {
cids = append(cids, block.Cid())
}
startTick := time.Now()
_, err := session.GetBlocks(ctx, cids)
if err != nil {
t.Fatal("error getting blocks")
}
// clear the initial block of wants
select {
case <-wantReqs:
case <-ctx.Done():
t.Fatal("Did not make first want request ")
}
// verify a broadcast is made
select {
case receivedWantReq := <-wantReqs:
if len(receivedWantReq.cids) < len(cids) {
t.Fatal("did not rebroadcast whole live list")
}
if receivedWantReq.peers != nil {
t.Fatal("did not make a broadcast")
}
case <-ctx.Done():
t.Fatal("Never rebroadcast want list")
}
// wait for a request to get more peers to occur
select {
case k := <-fpm.findMorePeersRequested:
if testutil.IndexOf(blks, k) == -1 {
t.Fatal("did not rebroadcast an active want")
}
case <-ctx.Done():
t.Fatal("Did not find more peers")
}
firstTickLength := time.Since(startTick)
// wait for another broadcast to occur
select {
case receivedWantReq := <-wantReqs:
if len(receivedWantReq.cids) < len(cids) {
t.Fatal("did not rebroadcast whole live list")
}
if receivedWantReq.peers != nil {
t.Fatal("did not make a broadcast")
}
case <-ctx.Done():
t.Fatal("Never rebroadcast want list")
}
startTick = time.Now()
// wait for another broadcast to occur
select {
case receivedWantReq := <-wantReqs:
if len(receivedWantReq.cids) < len(cids) {
t.Fatal("did not rebroadcast whole live list")
}
if receivedWantReq.peers != nil {
t.Fatal("did not make a broadcast")
}
case <-ctx.Done():
t.Fatal("Never rebroadcast want list")
}
consecutiveTickLength := time.Since(startTick)
// tick should take longer
if firstTickLength > consecutiveTickLength {
t.Fatal("Should have increased tick length after first consecutive tick")
}
startTick = time.Now()
// wait for another broadcast to occur
select {
case receivedWantReq := <-wantReqs:
if len(receivedWantReq.cids) < len(cids) {
t.Fatal("did not rebroadcast whole live list")
}
if receivedWantReq.peers != nil {
t.Fatal("did not make a broadcast")
}
case <-ctx.Done():
t.Fatal("Never rebroadcast want list")
}
secondConsecutiveTickLength := time.Since(startTick)
// tick should take longer
if consecutiveTickLength > secondConsecutiveTickLength {
t.Fatal("Should have increased tick length after first consecutive tick")
}
// should not have looked for peers on consecutive ticks
select {
case <-fpm.findMorePeersRequested:
t.Fatal("Should not have looked for peers on consecutive tick")
default:
}
// wait for rebroadcast to occur
select {
case k := <-fpm.findMorePeersRequested:
if testutil.IndexOf(blks, k) == -1 {
t.Fatal("did not rebroadcast an active want")
}
case <-ctx.Done():
t.Fatal("Did not rebroadcast to find more peers")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment