Commit 8454ba00 authored by Steven Allen's avatar Steven Allen

session: buffer some request channels

We're not using these synchronously so we can buffer them a bit to avoid
blocking quite as much.

This also combines all incoming channels into a single one to ensure all
operations are processed in-order. This might be overkill bit it makes reasoning
about this a bit simpler.
parent 1fd68ed7
...@@ -45,9 +45,18 @@ type RequestSplitter interface { ...@@ -45,9 +45,18 @@ type RequestSplitter interface {
RecordUniqueBlock() RecordUniqueBlock()
} }
type rcvFrom struct { type opType int
const (
opReceive opType = iota
opWant
opCancel
)
type op struct {
op opType
from peer.ID from peer.ID
ks []cid.Cid keys []cid.Cid
} }
// Session holds state for an individual bitswap transfer operation. // Session holds state for an individual bitswap transfer operation.
...@@ -63,9 +72,7 @@ type Session struct { ...@@ -63,9 +72,7 @@ type Session struct {
sw sessionWants sw sessionWants
// channels // channels
incoming chan rcvFrom incoming chan op
newReqs chan []cid.Cid
cancelKeys chan []cid.Cid
latencyReqs chan chan time.Duration latencyReqs chan chan time.Duration
tickDelayReqs chan time.Duration tickDelayReqs chan time.Duration
...@@ -100,15 +107,13 @@ func New(ctx context.Context, ...@@ -100,15 +107,13 @@ func New(ctx context.Context,
liveWants: make(map[cid.Cid]time.Time), liveWants: make(map[cid.Cid]time.Time),
pastWants: cid.NewSet(), pastWants: cid.NewSet(),
}, },
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
latencyReqs: make(chan chan time.Duration), latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration), tickDelayReqs: make(chan time.Duration),
ctx: ctx, ctx: ctx,
wm: wm, wm: wm,
pm: pm, pm: pm,
srs: srs, srs: srs,
incoming: make(chan rcvFrom), incoming: make(chan op, 16),
notif: notif, notif: notif,
uuid: loggables.Uuid("GetBlockRequest"), uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500, baseTickDelay: time.Millisecond * 500,
...@@ -130,7 +135,7 @@ func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) { ...@@ -130,7 +135,7 @@ func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) {
} }
select { select {
case s.incoming <- rcvFrom{from: from, ks: interested}: case s.incoming <- op{op: opReceive, from: from, keys: interested}:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
} }
...@@ -154,14 +159,14 @@ func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks. ...@@ -154,14 +159,14 @@ func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.
return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif, return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
func(ctx context.Context, keys []cid.Cid) { func(ctx context.Context, keys []cid.Cid) {
select { select {
case s.newReqs <- keys: case s.incoming <- op{op: opWant, keys: keys}:
case <-ctx.Done(): case <-ctx.Done():
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
}, },
func(keys []cid.Cid) { func(keys []cid.Cid) {
select { select {
case s.cancelKeys <- keys: case s.incoming <- op{op: opCancel, keys: keys}:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
}, },
...@@ -200,12 +205,17 @@ func (s *Session) run(ctx context.Context) { ...@@ -200,12 +205,17 @@ func (s *Session) run(ctx context.Context) {
s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime()) s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
for { for {
select { select {
case rcv := <-s.incoming: case oper := <-s.incoming:
s.handleIncoming(ctx, rcv) switch oper.op {
case keys := <-s.newReqs: case opReceive:
s.wantBlocks(ctx, keys) s.handleReceive(ctx, oper.from, oper.keys)
case keys := <-s.cancelKeys: case opWant:
s.sw.CancelPending(keys) s.wantBlocks(ctx, oper.keys)
case opCancel:
s.sw.CancelPending(oper.keys)
default:
panic("unhandled operation")
}
case <-s.idleTick.C: case <-s.idleTick.C:
s.handleIdleTick(ctx) s.handleIdleTick(ctx)
case <-s.periodicSearchTimer.C: case <-s.periodicSearchTimer.C:
...@@ -261,15 +271,15 @@ func (s *Session) handleShutdown() { ...@@ -261,15 +271,15 @@ func (s *Session) handleShutdown() {
s.wm.CancelWants(s.ctx, live, nil, s.id) s.wm.CancelWants(s.ctx, live, nil, s.id)
} }
func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) { func (s *Session) handleReceive(ctx context.Context, from peer.ID, keys []cid.Cid) {
// Record statistics only if the blocks came from the network // Record statistics only if the blocks came from the network
// (blocks can also be received from the local node) // (blocks can also be received from the local node)
if rcv.from != "" { if from != "" {
s.updateReceiveCounters(ctx, rcv) s.updateReceiveCounters(ctx, from, keys)
} }
// Update the want list // Update the want list
wanted, totalLatency := s.sw.BlocksReceived(rcv.ks) wanted, totalLatency := s.sw.BlocksReceived(keys)
if len(wanted) == 0 { if len(wanted) == 0 {
return return
} }
...@@ -280,18 +290,18 @@ func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) { ...@@ -280,18 +290,18 @@ func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
s.idleTick.Stop() s.idleTick.Stop()
// Process the received blocks // Process the received blocks
s.processIncoming(ctx, wanted, totalLatency) s.processReceive(ctx, wanted, totalLatency)
s.resetIdleTick() s.resetIdleTick()
} }
func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) { func (s *Session) updateReceiveCounters(ctx context.Context, from peer.ID, keys []cid.Cid) {
// Record unique vs duplicate blocks // Record unique vs duplicate blocks
s.sw.ForEachUniqDup(rcv.ks, s.srs.RecordUniqueBlock, s.srs.RecordDuplicateBlock) s.sw.ForEachUniqDup(keys, s.srs.RecordUniqueBlock, s.srs.RecordDuplicateBlock)
// Record response (to be able to time latency) // Record response (to be able to time latency)
if len(rcv.ks) > 0 { if len(keys) > 0 {
s.pm.RecordPeerResponse(rcv.from, rcv.ks) s.pm.RecordPeerResponse(from, keys)
} }
} }
...@@ -300,7 +310,7 @@ func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) { ...@@ -300,7 +310,7 @@ func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) {
s.wm.CancelWants(s.ctx, ks, nil, s.id) s.wm.CancelWants(s.ctx, ks, nil, s.id)
} }
func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid, totalLatency time.Duration) { func (s *Session) processReceive(ctx context.Context, ks []cid.Cid, totalLatency time.Duration) {
// Keep track of the total number of blocks received and total latency // Keep track of the total number of blocks received and total latency
s.fetchcnt += len(ks) s.fetchcnt += len(ks)
s.latTotal += totalLatency s.latTotal += totalLatency
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment