Unverified Commit 7a85a068 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #35 from libp2p/steb/fastrack-ping

feat: improve ping accuracy
parents 10c91193 c5baab6f
...@@ -66,8 +66,9 @@ type Session struct { ...@@ -66,8 +66,9 @@ type Session struct {
// sendCh is used to send messages // sendCh is used to send messages
sendCh chan []byte sendCh chan []byte
// pingCh is used to send pongs (responses to pings)
pongCh chan uint32 // pingCh and pingCh are used to send pings and pongs
pongCh, pingCh chan uint32
// recvDoneCh is closed when recv() exits to avoid a race // recvDoneCh is closed when recv() exits to avoid a race
// between stream registration and stream shutdown // between stream registration and stream shutdown
...@@ -112,6 +113,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio ...@@ -112,6 +113,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
acceptCh: make(chan *Stream, config.AcceptBacklog), acceptCh: make(chan *Stream, config.AcceptBacklog),
sendCh: make(chan []byte, 64), sendCh: make(chan []byte, 64),
pongCh: make(chan uint32, config.PingBacklog), pongCh: make(chan uint32, config.PingBacklog),
pingCh: make(chan uint32),
recvDoneCh: make(chan struct{}), recvDoneCh: make(chan struct{}),
sendDoneCh: make(chan struct{}), sendDoneCh: make(chan struct{}),
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
...@@ -312,16 +314,27 @@ func (s *Session) Ping() (dur time.Duration, err error) { ...@@ -312,16 +314,27 @@ func (s *Session) Ping() (dur time.Duration, err error) {
s.pingLock.Unlock() s.pingLock.Unlock()
}() }()
// Send the ping request // Send the ping request, waiting at most one connection write timeout
hdr := encode(typePing, flagSYN, 0, activePing.id) // to flush it.
if err := s.sendMsg(hdr, nil, nil); err != nil { timer := time.NewTimer(s.config.ConnectionWriteTimeout)
return 0, err defer timer.Stop()
select {
case s.pingCh <- activePing.id:
case <-timer.C:
return 0, ErrTimeout
case <-s.shutdownCh:
return 0, s.shutdownErr
} }
// Wait for a response // The "time" starts once we've actually sent the ping. Otherwise, we'll
// measure the time it takes to flush the queue as well.
start := time.Now() start := time.Now()
timer := time.NewTimer(s.config.ConnectionWriteTimeout)
defer timer.Stop() // Wait for a response, again waiting at most one write timeout.
if !timer.Stop() {
<-timer.C
}
timer.Reset(s.config.ConnectionWriteTimeout)
select { select {
case <-activePing.pingResponse: case <-activePing.pingResponse:
case <-timer.C: case <-timer.C:
...@@ -473,6 +486,10 @@ func (s *Session) sendLoop() error { ...@@ -473,6 +486,10 @@ func (s *Session) sendLoop() error {
var buf []byte var buf []byte
select { select {
case buf = <-s.sendCh: case buf = <-s.sendCh:
case pingID := <-s.pingCh:
buf = pool.Get(headerSize)
hdr := encode(typePing, flagSYN, 0, pingID)
copy(buf, hdr[:])
case pingID := <-s.pongCh: case pingID := <-s.pongCh:
buf = pool.Get(headerSize) buf = pool.Get(headerSize)
hdr := encode(typePing, flagACK, 0, pingID) hdr := encode(typePing, flagACK, 0, pingID)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment