From bcf85413390a677b6e59325a59ea5c31f5e0c6bd Mon Sep 17 00:00:00 2001 From: Paul Wolneykien Date: Thu, 10 Sep 2020 15:05:51 +0300 Subject: [PATCH] Fix: Increment stats.MessagesSent in msgToStream() function (#441) * Share common code between network/ipfs_impl_test.go tests Extract the code that is common in TestMessageResendAfterError, TestMessageSendTimeout and TestMessageSendNotSupportedResponse to a separate function. * Make prepareNetwork() return two hosts and two networks Let prepareNetwork() make simmetric setup with two `ErrHost`s with two `impl` networks to be sure we test `impl` instances on both ends. * Added TestNetworkCounters test to the "network" package The test shows we have a problem with `MessagesSent` counter. * Fix: Increment stats.MessagesSent in msgToStream() function Fixes the bug with incrementing `MessagesSent` counter only in `SendMessage()` method if `impl`. Now it works for `MessageSender` too. * Allow to specify a network event listener for tests Added `listener network.Notifiee` to the `receiver` structure. If a listener is specified then `prepareNetwork()` connects it to the mock network it builds before making any connections. * Wait for all network streams are closed in testNetworkCounters Wait for all network streams are closed instead of just using a timeout. The timeout of 5 s is still used as a deadline (it makes the test to fail). * Fix: Close the MessageSender in testNetworkCounters() The `MessageSender` needs to be closed if we want all streams in the network to be closed. * Fix: Close MessageSender in other tests too Co-authored-by: Paul Wolneykien --- network/ipfs_impl.go | 3 +- network/ipfs_impl_test.go | 279 +++++++++++++++++++++++--------------- 2 files changed, 171 insertions(+), 111 deletions(-) diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 3636b04..0254e64 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -265,6 +265,8 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg. return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol()) } + atomic.AddUint64(&bsnet.stats.MessagesSent, 1) + if err := s.SetWriteDeadline(time.Time{}); err != nil { log.Warnf("error resetting deadline: %s", err) } @@ -320,7 +322,6 @@ func (bsnet *impl) SendMessage( _ = s.Reset() return err } - atomic.AddUint64(&bsnet.stats.MessagesSent, 1) // TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine. //nolint diff --git a/network/ipfs_impl_test.go b/network/ipfs_impl_test.go index 454bb41..3ad047f 100644 --- a/network/ipfs_impl_test.go +++ b/network/ipfs_impl_test.go @@ -31,6 +31,7 @@ type receiver struct { connectionEvent chan bool lastMessage bsmsg.BitSwapMessage lastSender peer.ID + listener network.Notifiee } func newReceiver() *receiver { @@ -254,36 +255,38 @@ func TestMessageSendAndReceive(t *testing.T) { } } -func TestMessageResendAfterError(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - +func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *receiver, p2 tnet.Identity, r2 *receiver) (*ErrHost, bsnet.BitSwapNetwork, *ErrHost, bsnet.BitSwapNetwork, bsmsg.BitSwapMessage) { // create network mn := mocknet.New(ctx) mr := mockrouting.NewServer() - streamNet, err := tn.StreamNet(ctx, mn, mr) - if err != nil { - t.Fatal("Unable to setup network") - } - p1 := tnet.RandIdentityOrFatal(t) - p2 := tnet.RandIdentityOrFatal(t) + // Host 1 h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address()) if err != nil { t.Fatal(err) } - - // Create a special host that we can force to start returning errors - eh := &ErrHost{Host: h1} - routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) - bsnet1 := bsnet.NewFromIpfsHost(eh, routing) - - bsnet2 := streamNet.Adapter(p2) - r1 := newReceiver() - r2 := newReceiver() + eh1 := &ErrHost{Host: h1} + routing1 := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) + bsnet1 := bsnet.NewFromIpfsHost(eh1, routing1) bsnet1.SetDelegate(r1) + if r1.listener != nil { + eh1.Network().Notify(r1.listener) + } + + // Host 2 + h2, err := mn.AddPeer(p2.PrivateKey(), p2.Address()) + if err != nil { + t.Fatal(err) + } + eh2 := &ErrHost{Host: h2} + routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore()) + bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2) bsnet2.SetDelegate(r2) + if r2.listener != nil { + eh2.Network().Notify(r2.listener) + } + // Networking err = mn.LinkAll() if err != nil { t.Fatal(err) @@ -307,6 +310,20 @@ func TestMessageResendAfterError(t *testing.T) { msg := bsmsg.New(false) msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true) + return eh1, bsnet1, eh2, bsnet2, msg +} + +func TestMessageResendAfterError(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + p1 := tnet.RandIdentityOrFatal(t) + r1 := newReceiver() + p2 := tnet.RandIdentityOrFatal(t) + r2 := newReceiver() + + eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2) + testSendErrorBackoff := 100 * time.Millisecond ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ MaxRetries: 3, @@ -316,6 +333,7 @@ func TestMessageResendAfterError(t *testing.T) { if err != nil { t.Fatal(err) } + defer ms.Close() // Return an error from the networking layer the next time we try to send // a message @@ -345,54 +363,12 @@ func TestMessageSendTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - // create network - mn := mocknet.New(ctx) - mr := mockrouting.NewServer() - streamNet, err := tn.StreamNet(ctx, mn, mr) - if err != nil { - t.Fatal("Unable to setup network") - } p1 := tnet.RandIdentityOrFatal(t) - p2 := tnet.RandIdentityOrFatal(t) - - h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address()) - if err != nil { - t.Fatal(err) - } - - // Create a special host that we can force to start timing out - eh := &ErrHost{Host: h1} - routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) - bsnet1 := bsnet.NewFromIpfsHost(eh, routing) - - bsnet2 := streamNet.Adapter(p2) r1 := newReceiver() + p2 := tnet.RandIdentityOrFatal(t) r2 := newReceiver() - bsnet1.SetDelegate(r1) - bsnet2.SetDelegate(r2) - err = mn.LinkAll() - if err != nil { - t.Fatal(err) - } - err = bsnet1.ConnectTo(ctx, p2.ID()) - if err != nil { - t.Fatal(err) - } - isConnected := <-r1.connectionEvent - if !isConnected { - t.Fatal("Expected connect event") - } - - err = bsnet2.ConnectTo(ctx, p1.ID()) - if err != nil { - t.Fatal(err) - } - - blockGenerator := blocksutil.NewBlockGenerator() - block1 := blockGenerator.Next() - msg := bsmsg.New(false) - msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true) + eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2) ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ MaxRetries: 3, @@ -402,6 +378,7 @@ func TestMessageSendTimeout(t *testing.T) { if err != nil { t.Fatal(err) } + defer ms.Close() // Return a DeadlineExceeded error from the networking layer the next time we try to // send a message @@ -416,7 +393,7 @@ func TestMessageSendTimeout(t *testing.T) { select { case <-time.After(500 * time.Millisecond): t.Fatal("Did not receive disconnect event") - case isConnected = <-r1.connectionEvent: + case isConnected := <-r1.connectionEvent: if isConnected { t.Fatal("Expected disconnect event (got connect event)") } @@ -427,69 +404,28 @@ func TestMessageSendNotSupportedResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - // create network - mn := mocknet.New(ctx) - mr := mockrouting.NewServer() - streamNet, err := tn.StreamNet(ctx, mn, mr) - if err != nil { - t.Fatal("Unable to setup network") - } p1 := tnet.RandIdentityOrFatal(t) - p2 := tnet.RandIdentityOrFatal(t) - - h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address()) - if err != nil { - t.Fatal(err) - } - - // Create a special host that responds with ErrNotSupported - eh := &ErrHost{Host: h1} - routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) - bsnet1 := bsnet.NewFromIpfsHost(eh, routing) - - bsnet2 := streamNet.Adapter(p2) r1 := newReceiver() + p2 := tnet.RandIdentityOrFatal(t) r2 := newReceiver() - bsnet1.SetDelegate(r1) - bsnet2.SetDelegate(r2) - - err = mn.LinkAll() - if err != nil { - t.Fatal(err) - } - err = bsnet1.ConnectTo(ctx, p2.ID()) - if err != nil { - t.Fatal(err) - } - isConnected := <-r1.connectionEvent - if !isConnected { - t.Fatal("Expected connect event") - } - err = bsnet2.ConnectTo(ctx, p1.ID()) - if err != nil { - t.Fatal(err) - } - - blockGenerator := blocksutil.NewBlockGenerator() - block1 := blockGenerator.Next() - msg := bsmsg.New(false) - msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true) + eh, bsnet1, _, _, _ := prepareNetwork(t, ctx, p1, r1, p2, r2) eh.setError(multistream.ErrNotSupported) - _, err = bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ + ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{ MaxRetries: 3, SendTimeout: 100 * time.Millisecond, SendErrorBackoff: 100 * time.Millisecond, }) if err == nil { + ms.Close() t.Fatal("Expected ErrNotSupported") } select { case <-time.After(500 * time.Millisecond): t.Fatal("Did not receive disconnect event") - case isConnected = <-r1.connectionEvent: + case isConnected := <-r1.connectionEvent: if isConnected { t.Fatal("Expected disconnect event (got connect event)") } @@ -535,9 +471,132 @@ func TestSupportsHave(t *testing.T) { if err != nil { t.Fatal(err) } + defer senderCurrent.Close() if senderCurrent.SupportsHave() != tc.expSupportsHave { t.Fatal("Expected sender HAVE message support", tc.proto, tc.expSupportsHave) } } } + +func testNetworkCounters(t *testing.T, n1 int, n2 int) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + p1 := tnet.RandIdentityOrFatal(t) + r1 := newReceiver() + p2 := tnet.RandIdentityOrFatal(t) + r2 := newReceiver() + + var wg1, wg2 sync.WaitGroup + r1.listener = &network.NotifyBundle{ + OpenedStreamF: func(n network.Network, s network.Stream) { + wg1.Add(1) + }, + ClosedStreamF: func(n network.Network, s network.Stream) { + wg1.Done() + }, + } + r2.listener = &network.NotifyBundle{ + OpenedStreamF: func(n network.Network, s network.Stream) { + wg2.Add(1) + }, + ClosedStreamF: func(n network.Network, s network.Stream) { + wg2.Done() + }, + } + _, bsnet1, _, bsnet2, msg := prepareNetwork(t, ctx, p1, r1, p2, r2) + + for n := 0; n < n1; n++ { + ctx, cancel := context.WithTimeout(ctx, time.Second) + err := bsnet1.SendMessage(ctx, p2.ID(), msg) + if err != nil { + t.Fatal(err) + } + select { + case <-ctx.Done(): + t.Fatal("p2 did not receive message sent") + case <-r2.messageReceived: + for j := 0; j < 2; j++ { + err := bsnet2.SendMessage(ctx, p1.ID(), msg) + if err != nil { + t.Fatal(err) + } + select { + case <-ctx.Done(): + t.Fatal("p1 did not receive message sent") + case <-r1.messageReceived: + } + } + } + cancel() + } + + if n2 > 0 { + ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{}) + if err != nil { + t.Fatal(err) + } + defer ms.Close() + for n := 0; n < n2; n++ { + ctx, cancel := context.WithTimeout(ctx, time.Second) + err = ms.SendMsg(ctx, msg) + if err != nil { + t.Fatal(err) + } + select { + case <-ctx.Done(): + t.Fatal("p2 did not receive message sent") + case <-r2.messageReceived: + for j := 0; j < 2; j++ { + err := bsnet2.SendMessage(ctx, p1.ID(), msg) + if err != nil { + t.Fatal(err) + } + select { + case <-ctx.Done(): + t.Fatal("p1 did not receive message sent") + case <-r1.messageReceived: + } + } + } + cancel() + } + ms.Close() + } + + // Wait until all streams are closed and MessagesRecvd counters + // updated. + ctxto, cancelto := context.WithTimeout(ctx, 5*time.Second) + defer cancelto() + ctxwait, cancelwait := context.WithCancel(ctx) + defer cancelwait() + go func() { + wg1.Wait() + wg2.Wait() + cancelwait() + }() + select { + case <-ctxto.Done(): + t.Fatal("network streams closing timed out") + case <-ctxwait.Done(): + } + + if bsnet1.Stats().MessagesSent != uint64(n1+n2) { + t.Fatal(fmt.Errorf("expected %d sent messages, got %d", n1+n2, bsnet1.Stats().MessagesSent)) + } + + if bsnet2.Stats().MessagesRecvd != uint64(n1+n2) { + t.Fatal(fmt.Errorf("expected %d received messages, got %d", n1+n2, bsnet2.Stats().MessagesRecvd)) + } + + if bsnet1.Stats().MessagesRecvd != 2*uint64(n1+n2) { + t.Fatal(fmt.Errorf("expected %d received reply messages, got %d", 2*(n1+n2), bsnet1.Stats().MessagesRecvd)) + } +} + +func TestNetworkCounters(t *testing.T) { + for n := 0; n < 11; n++ { + testNetworkCounters(t, 10-n, n) + } +} -- GitLab