Commit ccc17d27 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

net: cool test for new stream backpressure

Sadly, as cool as this test is, it doesn't work
Because spdystream doesnt handle stream open backpressure
well. I'll see about rewriting that part when it becomes
a problem. More backpressure tests comming.
parent 358b8a99
package backpressure_tests
import (
"testing"
"time"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
testutil "github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
var log = eventlog.Logger("backpressure")
func GenNetwork(ctx context.Context) (inet.Network, error) {
p, err := testutil.PeerWithKeysAndAddress(testutil.RandLocalTCPAddress())
if err != nil {
return nil, err
}
listen := p.Addresses()
ps := peer.NewPeerstore()
return inet.NewNetwork(ctx, listen, p, ps)
}
// TestBackpressureStreamHandler tests whether mux handler
// ratelimiting works. Meaning, since the handler is sequential
// it should block senders.
//
// Important note: spdystream (which peerstream uses) has a set
// of n workers (n=spdsystream.FRAME_WORKERS) which handle new
// frames, including those starting new streams. So all of them
// can be in the handler at one time. Also, the sending side
// does not rate limit unless we call stream.Wait()
//
//
// Note: right now, this happens muxer-wide. the muxer should
// learn to flow control, so handlers cant block each other.
func TestBackpressureStreamHandler(t *testing.T) {
t.Skip(`Sadly, as cool as this test is, it doesn't work
Because spdystream doesnt handle stream open backpressure
well IMO. I'll see about rewriting that part when it becomes
a problem.
`)
// a number of concurrent request handlers
limit := 10
// our way to signal that we're done with 1 request
requestHandled := make(chan struct{})
// handler rate limiting
receiverRatelimit := make(chan struct{}, limit)
for i := 0; i < limit; i++ {
receiverRatelimit <- struct{}{}
}
// sender counter of successfully opened streams
senderOpened := make(chan struct{}, limit*100)
// sender signals it's done (errored out)
senderDone := make(chan struct{})
// the receiver handles requests with some rate limiting
receiver := func(s inet.Stream) {
log.Debug("receiver received a stream")
<-receiverRatelimit // acquire
go func() {
// our request handler. can do stuff here. we
// simulate something taking time by waiting
// on requestHandled
log.Error("request worker handling...")
<-requestHandled
log.Error("request worker done!")
receiverRatelimit <- struct{}{} // release
}()
}
// the sender opens streams as fast as possible
sender := func(net inet.Network, remote peer.Peer) {
var s inet.Stream
var err error
defer func() {
t.Error(err)
log.Debug("sender error. exiting.")
senderDone <- struct{}{}
}()
for {
s, err = net.NewStream(inet.ProtocolTesting, remote)
if err != nil {
return
}
_ = s
// if err = s.SwarmStream().Stream().Wait(); err != nil {
// return
// }
// "count" another successfully opened stream
// (large buffer so shouldn't block in normal operation)
log.Debug("sender opened another stream!")
senderOpened <- struct{}{}
}
}
// count our senderOpened events
countStreamsOpenedBySender := func(min int) int {
opened := 0
for opened < min {
log.Debugf("countStreamsOpenedBySender got %d (min %d)", opened, min)
select {
case <-senderOpened:
opened++
case <-time.After(10 * time.Millisecond):
}
}
return opened
}
// count our received events
// waitForNReceivedStreams := func(n int) {
// for n > 0 {
// log.Debugf("waiting for %d received streams...", n)
// select {
// case <-receiverRatelimit:
// n--
// }
// }
// }
testStreamsOpened := func(expected int) {
log.Debugf("testing rate limited to %d streams", expected)
if n := countStreamsOpenedBySender(expected); n != expected {
t.Fatalf("rate limiting did not work :( -- %d != %d", expected, n)
}
}
// ok that's enough setup. let's do it!
ctx := context.Background()
n1, err := GenNetwork(ctx)
if err != nil {
t.Fatal(err)
}
n2, err := GenNetwork(ctx)
if err != nil {
t.Fatal(err)
}
// setup receiver handler
n1.SetHandler(inet.ProtocolTesting, receiver)
log.Debugf("dialing %s", n2.ListenAddresses())
if err := n1.DialPeer(ctx, n2.LocalPeer()); err != nil {
t.Fatalf("Failed to dial:", err)
}
// launch sender!
go sender(n2, n1.LocalPeer())
// ok, what do we expect to happen? the receiver should
// receive 10 requests and stop receiving, blocking the sender.
// we can test this by counting 10x senderOpened requests
<-senderOpened // wait for the sender to successfully open some.
testStreamsOpened(limit - 1)
// let's "handle" 3 requests.
<-requestHandled
<-requestHandled
<-requestHandled
// the sender should've now been able to open exactly 3 more.
testStreamsOpened(3)
// shouldn't have opened anything more
testStreamsOpened(0)
// let's "handle" 100 requests in batches of 5
for i := 0; i < 20; i++ {
<-requestHandled
<-requestHandled
<-requestHandled
<-requestHandled
<-requestHandled
testStreamsOpened(5)
}
// success!
// now for the sugar on top: let's tear down the receiver. it should
// exit the sender.
n1.Close()
// shouldn't have opened anything more
testStreamsOpened(0)
select {
case <-time.After(100 * time.Millisecond):
t.Error("receiver shutdown failed to exit sender")
case <-senderDone:
log.Info("handler backpressure works!")
}
}
......@@ -40,11 +40,13 @@ type Mux struct {
// ReadProtocolHeader reads the stream and returns the next Handler function
// according to the muxer encoding.
func (m *Mux) ReadProtocolHeader(s io.Reader) (string, StreamHandler, error) {
// log.Error("ReadProtocolHeader")
name, err := ReadLengthPrefix(s)
if err != nil {
return "", nil, err
}
// log.Debug("ReadProtocolHeader got:", name)
m.RLock()
h, found := m.Handlers[ProtocolID(name)]
m.RUnlock()
......@@ -69,6 +71,19 @@ func (m *Mux) SetHandler(p ProtocolID, h StreamHandler) {
// Handle reads the next name off the Stream, and calls a function
func (m *Mux) Handle(s Stream) {
// Flow control and backpressure of Opening streams is broken.
// I believe that spdystream has one set of workers that both send
// data AND accept new streams (as it's just more data). there
// is a problem where if the new stream handlers want to throttle,
// they also eliminate the ability to read/write data, which makes
// forward-progress impossible. Thus, throttling this function is
// -- at this moment -- not the solution. Either spdystream must
// change, or we must throttle another way.
//
// In light of this, we use a goroutine for now (otherwise the
// spdy worker totally blocks, and we can't even read the protocol
// header). The better route in the future is to use a worker pool.
go func() {
ctx := context.Background()
......@@ -107,6 +122,7 @@ func ReadLengthPrefix(r io.Reader) (string, error) {
// WriteLengthPrefix writes the name into Writer with a length-byte-prefix.
func WriteLengthPrefix(w io.Writer, name string) error {
// log.Error("WriteLengthPrefix", name)
s := make([]byte, len(name)+1)
s[0] = byte(len(name))
copy(s[1:], []byte(name))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment