Unverified Commit bcf85413 authored by Paul Wolneykien's avatar Paul Wolneykien Committed by GitHub

Fix: Increment stats.MessagesSent in msgToStream() function (#441)

* Share common code between network/ipfs_impl_test.go tests

Extract the code that is common in TestMessageResendAfterError,
TestMessageSendTimeout and TestMessageSendNotSupportedResponse to a
separate function.

* Make prepareNetwork() return two hosts and two networks

Let prepareNetwork() make simmetric setup with two `ErrHost`s
with two `impl` networks to be sure we test `impl` instances on
both ends.

* Added TestNetworkCounters test to the "network" package

The test shows we have a problem with `MessagesSent` counter.

* Fix: Increment stats.MessagesSent in msgToStream() function

Fixes the bug with incrementing `MessagesSent` counter only in
`SendMessage()` method if `impl`. Now it works for `MessageSender`
too.

* Allow to specify a network event listener for tests

Added `listener network.Notifiee` to the `receiver` structure.
If a listener is specified then `prepareNetwork()` connects it
to the mock network it builds before making any connections.

* Wait for all network streams are closed in testNetworkCounters

Wait for all network streams are closed instead of just using a
timeout. The timeout of 5 s is still used as a deadline (it makes
the test to fail).

* Fix: Close the MessageSender in testNetworkCounters()

The `MessageSender` needs to be closed if we want all streams in
the network to be closed.

* Fix: Close MessageSender in other tests too
Co-authored-by: default avatarPaul Wolneykien <manowar@altlinux.org>
parent 00f4df8d
......@@ -265,6 +265,8 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
}
atomic.AddUint64(&bsnet.stats.MessagesSent, 1)
if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warnf("error resetting deadline: %s", err)
}
......@@ -320,7 +322,6 @@ func (bsnet *impl) SendMessage(
_ = s.Reset()
return err
}
atomic.AddUint64(&bsnet.stats.MessagesSent, 1)
// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
//nolint
......
......@@ -31,6 +31,7 @@ type receiver struct {
connectionEvent chan bool
lastMessage bsmsg.BitSwapMessage
lastSender peer.ID
listener network.Notifiee
}
func newReceiver() *receiver {
......@@ -254,36 +255,38 @@ func TestMessageSendAndReceive(t *testing.T) {
}
}
func TestMessageResendAfterError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *receiver, p2 tnet.Identity, r2 *receiver) (*ErrHost, bsnet.BitSwapNetwork, *ErrHost, bsnet.BitSwapNetwork, bsmsg.BitSwapMessage) {
// create network
mn := mocknet.New(ctx)
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)
// Host 1
h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}
// Create a special host that we can force to start returning errors
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)
bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
r2 := newReceiver()
eh1 := &ErrHost{Host: h1}
routing1 := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh1, routing1)
bsnet1.SetDelegate(r1)
if r1.listener != nil {
eh1.Network().Notify(r1.listener)
}
// Host 2
h2, err := mn.AddPeer(p2.PrivateKey(), p2.Address())
if err != nil {
t.Fatal(err)
}
eh2 := &ErrHost{Host: h2}
routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore())
bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2)
bsnet2.SetDelegate(r2)
if r2.listener != nil {
eh2.Network().Notify(r2.listener)
}
// Networking
err = mn.LinkAll()
if err != nil {
t.Fatal(err)
......@@ -307,6 +310,20 @@ func TestMessageResendAfterError(t *testing.T) {
msg := bsmsg.New(false)
msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true)
return eh1, bsnet1, eh2, bsnet2, msg
}
func TestMessageResendAfterError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
p1 := tnet.RandIdentityOrFatal(t)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()
eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)
testSendErrorBackoff := 100 * time.Millisecond
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
MaxRetries: 3,
......@@ -316,6 +333,7 @@ func TestMessageResendAfterError(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer ms.Close()
// Return an error from the networking layer the next time we try to send
// a message
......@@ -345,54 +363,12 @@ func TestMessageSendTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create network
mn := mocknet.New(ctx)
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)
h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}
// Create a special host that we can force to start timing out
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)
bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()
bsnet1.SetDelegate(r1)
bsnet2.SetDelegate(r2)
err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}
err = bsnet1.ConnectTo(ctx, p2.ID())
if err != nil {
t.Fatal(err)
}
isConnected := <-r1.connectionEvent
if !isConnected {
t.Fatal("Expected connect event")
}
err = bsnet2.ConnectTo(ctx, p1.ID())
if err != nil {
t.Fatal(err)
}
blockGenerator := blocksutil.NewBlockGenerator()
block1 := blockGenerator.Next()
msg := bsmsg.New(false)
msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true)
eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
MaxRetries: 3,
......@@ -402,6 +378,7 @@ func TestMessageSendTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer ms.Close()
// Return a DeadlineExceeded error from the networking layer the next time we try to
// send a message
......@@ -416,7 +393,7 @@ func TestMessageSendTimeout(t *testing.T) {
select {
case <-time.After(500 * time.Millisecond):
t.Fatal("Did not receive disconnect event")
case isConnected = <-r1.connectionEvent:
case isConnected := <-r1.connectionEvent:
if isConnected {
t.Fatal("Expected disconnect event (got connect event)")
}
......@@ -427,69 +404,28 @@ func TestMessageSendNotSupportedResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create network
mn := mocknet.New(ctx)
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)
h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}
// Create a special host that responds with ErrNotSupported
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)
bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()
bsnet1.SetDelegate(r1)
bsnet2.SetDelegate(r2)
err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}
err = bsnet1.ConnectTo(ctx, p2.ID())
if err != nil {
t.Fatal(err)
}
isConnected := <-r1.connectionEvent
if !isConnected {
t.Fatal("Expected connect event")
}
err = bsnet2.ConnectTo(ctx, p1.ID())
if err != nil {
t.Fatal(err)
}
blockGenerator := blocksutil.NewBlockGenerator()
block1 := blockGenerator.Next()
msg := bsmsg.New(false)
msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true)
eh, bsnet1, _, _, _ := prepareNetwork(t, ctx, p1, r1, p2, r2)
eh.setError(multistream.ErrNotSupported)
_, err = bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
MaxRetries: 3,
SendTimeout: 100 * time.Millisecond,
SendErrorBackoff: 100 * time.Millisecond,
})
if err == nil {
ms.Close()
t.Fatal("Expected ErrNotSupported")
}
select {
case <-time.After(500 * time.Millisecond):
t.Fatal("Did not receive disconnect event")
case isConnected = <-r1.connectionEvent:
case isConnected := <-r1.connectionEvent:
if isConnected {
t.Fatal("Expected disconnect event (got connect event)")
}
......@@ -535,9 +471,132 @@ func TestSupportsHave(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer senderCurrent.Close()
if senderCurrent.SupportsHave() != tc.expSupportsHave {
t.Fatal("Expected sender HAVE message support", tc.proto, tc.expSupportsHave)
}
}
}
func testNetworkCounters(t *testing.T, n1 int, n2 int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p1 := tnet.RandIdentityOrFatal(t)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()
var wg1, wg2 sync.WaitGroup
r1.listener = &network.NotifyBundle{
OpenedStreamF: func(n network.Network, s network.Stream) {
wg1.Add(1)
},
ClosedStreamF: func(n network.Network, s network.Stream) {
wg1.Done()
},
}
r2.listener = &network.NotifyBundle{
OpenedStreamF: func(n network.Network, s network.Stream) {
wg2.Add(1)
},
ClosedStreamF: func(n network.Network, s network.Stream) {
wg2.Done()
},
}
_, bsnet1, _, bsnet2, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)
for n := 0; n < n1; n++ {
ctx, cancel := context.WithTimeout(ctx, time.Second)
err := bsnet1.SendMessage(ctx, p2.ID(), msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p2 did not receive message sent")
case <-r2.messageReceived:
for j := 0; j < 2; j++ {
err := bsnet2.SendMessage(ctx, p1.ID(), msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p1 did not receive message sent")
case <-r1.messageReceived:
}
}
}
cancel()
}
if n2 > 0 {
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{})
if err != nil {
t.Fatal(err)
}
defer ms.Close()
for n := 0; n < n2; n++ {
ctx, cancel := context.WithTimeout(ctx, time.Second)
err = ms.SendMsg(ctx, msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p2 did not receive message sent")
case <-r2.messageReceived:
for j := 0; j < 2; j++ {
err := bsnet2.SendMessage(ctx, p1.ID(), msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p1 did not receive message sent")
case <-r1.messageReceived:
}
}
}
cancel()
}
ms.Close()
}
// Wait until all streams are closed and MessagesRecvd counters
// updated.
ctxto, cancelto := context.WithTimeout(ctx, 5*time.Second)
defer cancelto()
ctxwait, cancelwait := context.WithCancel(ctx)
defer cancelwait()
go func() {
wg1.Wait()
wg2.Wait()
cancelwait()
}()
select {
case <-ctxto.Done():
t.Fatal("network streams closing timed out")
case <-ctxwait.Done():
}
if bsnet1.Stats().MessagesSent != uint64(n1+n2) {
t.Fatal(fmt.Errorf("expected %d sent messages, got %d", n1+n2, bsnet1.Stats().MessagesSent))
}
if bsnet2.Stats().MessagesRecvd != uint64(n1+n2) {
t.Fatal(fmt.Errorf("expected %d received messages, got %d", n1+n2, bsnet2.Stats().MessagesRecvd))
}
if bsnet1.Stats().MessagesRecvd != 2*uint64(n1+n2) {
t.Fatal(fmt.Errorf("expected %d received reply messages, got %d", 2*(n1+n2), bsnet1.Stats().MessagesRecvd))
}
}
func TestNetworkCounters(t *testing.T) {
for n := 0; n < 11; n++ {
testNetworkCounters(t, 10-n, n)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment