Commit d7a532d0 authored by hannahhoward's avatar hannahhoward

refactor(session): cleanup sessions

parent 9e891268
package session
import cid "github.com/ipfs/go-cid"
type cidQueue struct {
elems []cid.Cid
eset *cid.Set
}
func newCidQueue() *cidQueue {
return &cidQueue{eset: cid.NewSet()}
}
func (cq *cidQueue) Pop() cid.Cid {
for {
if len(cq.elems) == 0 {
return cid.Cid{}
}
out := cq.elems[0]
cq.elems = cq.elems[1:]
if cq.eset.Has(out) {
cq.eset.Remove(out)
return out
}
}
}
func (cq *cidQueue) Push(c cid.Cid) {
if cq.eset.Visit(c) {
cq.elems = append(cq.elems, c)
}
}
func (cq *cidQueue) Remove(c cid.Cid) {
cq.eset.Remove(c)
}
func (cq *cidQueue) Has(c cid.Cid) bool {
return cq.eset.Has(c)
}
func (cq *cidQueue) Len() int {
return cq.eset.Len()
}
......@@ -16,13 +16,15 @@ import (
const activeWantsLimit = 16
// Wantmanager is an interface that can be used to request blocks
// WantManager is an interface that can be used to request blocks
// from given peers.
type WantManager interface {
WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
type PeerManager interface {
FindMorePeers(context.Context, cid.Cid)
GetOptimizedPeers() []peer.ID
......@@ -107,6 +109,9 @@ func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
case s.incoming <- blkRecv{from: from, blk: blk}:
case <-s.ctx.Done():
}
ks := []cid.Cid{blk.Cid()}
s.wm.CancelWants(s.ctx, ks, nil, s.id)
}
// InterestedIn returns true if this session is interested in the given Cid.
......@@ -132,6 +137,7 @@ func (s *Session) ID() uint64 {
return s.id
}
// GetAverageLatency returns the average latency for block requests.
func (s *Session) GetAverageLatency() time.Duration {
resp := make(chan time.Duration)
select {
......@@ -148,6 +154,7 @@ func (s *Session) GetAverageLatency() time.Duration {
}
}
// SetBaseTickDelay changes the rate at which ticks happen.
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
select {
case s.tickDelayReqs <- baseTickDelay:
......@@ -341,46 +348,3 @@ func (s *Session) resetTick() {
s.tick.Reset(s.baseTickDelay + (3 * avLat))
}
}
type cidQueue struct {
elems []cid.Cid
eset *cid.Set
}
func newCidQueue() *cidQueue {
return &cidQueue{eset: cid.NewSet()}
}
func (cq *cidQueue) Pop() cid.Cid {
for {
if len(cq.elems) == 0 {
return cid.Cid{}
}
out := cq.elems[0]
cq.elems = cq.elems[1:]
if cq.eset.Has(out) {
cq.eset.Remove(out)
return out
}
}
}
func (cq *cidQueue) Push(c cid.Cid) {
if cq.eset.Visit(c) {
cq.elems = append(cq.elems, c)
}
}
func (cq *cidQueue) Remove(c cid.Cid) {
cq.eset.Remove(c)
}
func (cq *cidQueue) Has(c cid.Cid) bool {
return cq.eset.Has(c)
}
func (cq *cidQueue) Len() int {
return cq.eset.Len()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment