Commit 239bb922 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

epoll + kqueue

parent 3b2661f8
......@@ -9,8 +9,8 @@ import (
"syscall"
"time"
poll "github.com/jbenet/go-reuseport/poll"
sockaddrnet "github.com/jbenet/go-sockaddr/net"
goselect "github.com/jbenet/goselect"
)
const (
......@@ -323,19 +323,13 @@ func connect(fd int, ra syscall.Sockaddr, deadline time.Time) error {
return err
}
var err error
var timeout time.Duration
var pw goselect.FDSet
pw.Set(uintptr(fd))
for {
// wait until the fd is ready to read or write.
if !deadline.IsZero() {
timeout = deadline.Sub(time.Now())
} else {
timeout = -1
}
poller, err := poll.New(fd)
if err != nil {
return err
}
if err = goselect.Select(fd+1, nil, &pw, nil, timeout); err != nil {
for {
if err = poller.WaitWrite(deadline); err != nil {
return err
}
......
package poll
var errTimeout = &timeoutError{}
type timeoutError struct{}
func (e *timeoutError) Error() string { return "i/o timeout" }
func (e *timeoutError) Timeout() bool { return true }
func (e *timeoutError) Temporary() bool { return true }
// +build darwin freebsd dragonfly netbsd openbsd
package poll
import (
"syscall"
"time"
)
type Poller struct {
kqfd int
event syscall.Kevent_t
}
func New(fd int) (p *Poller, err error) {
p = &Poller{}
p.kqfd, err = syscall.Kqueue()
if p.kqfd == -1 || err != nil {
return nil, err
}
p.event = syscall.Kevent_t{
Ident: uint64(fd),
Filter: syscall.EVFILT_WRITE,
Flags: syscall.EV_ADD | syscall.EV_ENABLE | syscall.EV_ONESHOT,
Fflags: 0,
Data: 0,
Udata: nil,
}
return p, nil
}
func (p *Poller) Close() error {
return syscall.Close(p.kqfd)
}
func (p *Poller) WaitWrite(deadline time.Time) error {
// setup timeout
var timeout *syscall.Timespec
if !deadline.IsZero() {
d := deadline.Sub(time.Now())
t := syscall.NsecToTimespec(d.Nanoseconds())
timeout = &t
}
// wait on kevent
events := make([]syscall.Kevent_t, 1)
n, err := syscall.Kevent(p.kqfd, []syscall.Kevent_t{p.event}, events, timeout)
if err != nil {
return err
}
if n < 1 {
return errTimeout
}
return nil
}
// +build linux
package poll
import (
"syscall"
"time"
)
type Poller struct {
epfd int
event syscall.EpollEvent
events [32]syscall.EpollEvent
}
func New(fd int) (p *Poller, err error) {
p = &Poller{}
if p.epfd, err = syscall.EpollCreate1(0); err != nil {
return nil, err
}
p.event.Events = syscall.EPOLLOUT
p.event.Fd = int32(fd)
if err = syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &p.event); err != nil {
p.Close()
return nil, err
}
return p, nil
}
func (p *Poller) Close() error {
return syscall.Close(p.epfd)
}
func (p *Poller) WaitWrite(deadline time.Time) error {
msec := -1
if !deadline.IsZero() {
d := deadline.Sub(time.Now())
msec = int(d.Nanoseconds() / 1000000) // ms!? omg...
}
n, err := syscall.EpollWait(p.epfd, p.events[:], msec)
if err != nil {
return err
}
if n < 1 {
return errTimeout
}
return nil
}
// +build windows plan9
package poll
import (
"errors"
)
func WaitWrite(fd int) error {
return errors.New("platform not supported")
}
......@@ -272,7 +272,7 @@ func TestStreamListenDialSamePortStressManyMsgs(t *testing.T) {
}
for _, tcase := range testCases {
subestStreamListenDialSamePortStress(t, tcase[0], tcase[1], 2, 100)
subestStreamListenDialSamePortStress(t, tcase[0], tcase[1], 2, 1000)
}
}
......@@ -284,7 +284,19 @@ func TestStreamListenDialSamePortStressManyNodes(t *testing.T) {
}
for _, tcase := range testCases {
subestStreamListenDialSamePortStress(t, tcase[0], tcase[1], 100, 1)
subestStreamListenDialSamePortStress(t, tcase[0], tcase[1], 50, 1)
}
}
func TestStreamListenDialSamePortStressManyMsgsManyNodes(t *testing.T) {
testCases := [][]string{
[]string{"tcp", "127.0.0.1:0"},
[]string{"tcp4", "127.0.0.1:0"},
[]string{"tcp6", "[::]:0"},
}
for _, tcase := range testCases {
subestStreamListenDialSamePortStress(t, tcase[0], tcase[1], 50, 100)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment