Unverified Commit 6d1e10d2 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #139 from ipfs/fix/nits

fix some naming nits and broadcast on search
parents 02895498 eb28a2e1
......@@ -77,18 +77,18 @@ type Session struct {
tickDelayReqs chan time.Duration
// do not touch outside run loop
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
tick *time.Timer
rebroadcast *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
consecutiveTicks int
provSearchDelay time.Duration
rebroadcastDelay delay.D
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
idleTick *time.Timer
periodicSearchTimer *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
consecutiveTicks int
initialSearchDelay time.Duration
periodicSearchDelay delay.D
// identifiers
notif notifications.PubSub
uuid logging.Loggable
......@@ -102,28 +102,28 @@ func New(ctx context.Context,
wm WantManager,
pm PeerManager,
srs RequestSplitter,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) *Session {
initialSearchDelay time.Duration,
periodicSearchDelay delay.D) *Session {
s := &Session{
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
pm: pm,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
provSearchDelay: provSearchDelay,
rebroadcastDelay: rebroadcastDelay,
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
pm: pm,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
initialSearchDelay: initialSearchDelay,
periodicSearchDelay: periodicSearchDelay,
}
cache, _ := lru.New(2048)
......@@ -239,8 +239,8 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
// Session run loop -- everything function below here should not be called
// of this loop
func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(s.provSearchDelay)
s.rebroadcast = time.NewTimer(s.rebroadcastDelay.Get())
s.idleTick = time.NewTimer(s.initialSearchDelay)
s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
for {
select {
case blk := <-s.incoming:
......@@ -253,10 +253,10 @@ func (s *Session) run(ctx context.Context) {
s.handleNewRequest(ctx, keys)
case keys := <-s.cancelKeys:
s.handleCancel(keys)
case <-s.tick.C:
s.handleTick(ctx)
case <-s.rebroadcast.C:
s.handleRebroadcast(ctx)
case <-s.idleTick.C:
s.handleIdleTick(ctx)
case <-s.periodicSearchTimer.C:
s.handlePeriodicSearch(ctx)
case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c)
case resp := <-s.latencyReqs:
......@@ -271,7 +271,7 @@ func (s *Session) run(ctx context.Context) {
}
func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
s.tick.Stop()
s.idleTick.Stop()
if blk.from != "" {
s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
......@@ -279,7 +279,7 @@ func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
s.receiveBlock(ctx, blk.blk)
s.resetTick()
s.resetIdleTick()
}
func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
......@@ -307,7 +307,7 @@ func (s *Session) handleCancel(keys []cid.Cid) {
}
}
func (s *Session) handleTick(ctx context.Context) {
func (s *Session) handleIdleTick(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
......@@ -321,28 +321,29 @@ func (s *Session) handleTick(ctx context.Context) {
s.wm.WantBlocks(ctx, live, nil, s.id)
// do no find providers on consecutive ticks
// -- just rely on periodic rebroadcast
// -- just rely on periodic search widening
if len(live) > 0 && (s.consecutiveTicks == 0) {
s.pm.FindMorePeers(ctx, live[0])
}
s.resetTick()
s.resetIdleTick()
if len(s.liveWants) > 0 {
s.consecutiveTicks++
}
}
func (s *Session) handleRebroadcast(ctx context.Context) {
if len(s.liveWants) == 0 {
func (s *Session) handlePeriodicSearch(ctx context.Context) {
randomWant := s.randomLiveWant()
if !randomWant.Defined() {
return
}
// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
s.pm.FindMorePeers(ctx, s.randomLiveWant())
s.pm.FindMorePeers(ctx, randomWant)
s.wm.WantBlocks(ctx, []cid.Cid{randomWant}, nil, s.id)
s.rebroadcast.Reset(s.rebroadcastDelay.Get())
s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
}
func (s *Session) randomLiveWant() cid.Cid {
......@@ -357,7 +358,7 @@ func (s *Session) randomLiveWant() cid.Cid {
return cid.Cid{}
}
func (s *Session) handleShutdown() {
s.tick.Stop()
s.idleTick.Stop()
s.notif.Shutdown()
live := make([]cid.Cid, 0, len(s.liveWants))
......@@ -436,16 +437,16 @@ func (s *Session) averageLatency() time.Duration {
return s.latTotal / time.Duration(s.fetchcnt)
}
func (s *Session) resetTick() {
func (s *Session) resetIdleTick() {
var tickDelay time.Duration
if s.latTotal == 0 {
tickDelay = s.provSearchDelay
tickDelay = s.initialSearchDelay
} else {
avLat := s.averageLatency()
tickDelay = s.baseTickDelay + (3 * avLat)
}
tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
s.tick.Reset(tickDelay)
s.idleTick.Reset(tickDelay)
}
func (s *Session) wantBudget() int {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment