Commit 3b40d49d authored by Dirk McCormick's avatar Dirk McCormick

feat: dont retry if connect error is multistream.ErrNotSupported

parent b62e7fd0
......@@ -2,6 +2,7 @@ package network
import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"
......@@ -22,6 +23,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
msgio "github.com/libp2p/go-msgio"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multistream"
)
var log = logging.Logger("bitswap_network")
......@@ -164,7 +166,8 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context.
}
cancel()
// Attempt failed.
// Attempt failed
// If the sender has been closed or the context cancelled, just bail out
select {
case <-ctx.Done():
......@@ -174,11 +177,17 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context.
default:
}
// Protocol is not supported, so no need to try multiple times
if errors.Is(err, multistream.ErrNotSupported) {
s.bsnet.connectEvtMgr.MarkUnresponsive(s.to)
return err
}
// Failed to send so reset stream and try again
_ = s.Reset()
// Failed too many times so mark the peer as unresponsive and return an error
if i == s.opts.MaxRetries {
if i == s.opts.MaxRetries-1 {
s.bsnet.connectEvtMgr.MarkUnresponsive(s.to)
return err
}
......
......@@ -14,6 +14,7 @@ import (
ds "github.com/ipfs/go-datastore"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
"github.com/multiformats/go-multistream"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
......@@ -27,7 +28,7 @@ import (
type receiver struct {
peers map[peer.ID]struct{}
messageReceived chan struct{}
connectionEvent chan struct{}
connectionEvent chan bool
lastMessage bsmsg.BitSwapMessage
lastSender peer.ID
}
......@@ -36,7 +37,7 @@ func newReceiver() *receiver {
return &receiver{
peers: make(map[peer.ID]struct{}),
messageReceived: make(chan struct{}),
connectionEvent: make(chan struct{}, 1),
connectionEvent: make(chan bool, 1),
}
}
......@@ -57,12 +58,12 @@ func (r *receiver) ReceiveError(err error) {
func (r *receiver) PeerConnected(p peer.ID) {
r.peers[p] = struct{}{}
r.connectionEvent <- struct{}{}
r.connectionEvent <- true
}
func (r *receiver) PeerDisconnected(p peer.ID) {
delete(r.peers, p)
r.connectionEvent <- struct{}{}
r.connectionEvent <- false
}
var mockNetErr = fmt.Errorf("network err")
......@@ -70,14 +71,14 @@ var mockNetErr = fmt.Errorf("network err")
type ErrStream struct {
network.Stream
lk sync.Mutex
err bool
err error
timingOut bool
}
type ErrHost struct {
host.Host
lk sync.Mutex
err bool
err error
timingOut bool
streams []*ErrStream
}
......@@ -86,8 +87,8 @@ func (es *ErrStream) Write(b []byte) (int, error) {
es.lk.Lock()
defer es.lk.Unlock()
if es.err {
return 0, mockNetErr
if es.err != nil {
return 0, es.err
}
if es.timingOut {
return 0, context.DeadlineExceeded
......@@ -99,8 +100,8 @@ func (eh *ErrHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
eh.lk.Lock()
defer eh.lk.Unlock()
if eh.err {
return mockNetErr
if eh.err != nil {
return eh.err
}
if eh.timingOut {
return context.DeadlineExceeded
......@@ -112,7 +113,7 @@ func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID
eh.lk.Lock()
defer eh.lk.Unlock()
if eh.err {
if eh.err != nil {
return nil, mockNetErr
}
if eh.timingOut {
......@@ -125,14 +126,14 @@ func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID
return estrm, err
}
func (eh *ErrHost) setErrorState(erroring bool) {
func (eh *ErrHost) setError(err error) {
eh.lk.Lock()
defer eh.lk.Unlock()
eh.err = erroring
eh.err = err
for _, s := range eh.streams {
s.lk.Lock()
s.err = erroring
s.err = err
s.lk.Unlock()
}
}
......@@ -273,10 +274,7 @@ func TestMessageResendAfterError(t *testing.T) {
}
// Create a special host that we can force to start returning errors
eh := &ErrHost{
Host: h1,
err: false,
}
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)
......@@ -294,6 +292,11 @@ func TestMessageResendAfterError(t *testing.T) {
if err != nil {
t.Fatal(err)
}
isConnected := <-r1.connectionEvent
if !isConnected {
t.Fatal("Expected connect event")
}
err = bsnet2.ConnectTo(ctx, p1.ID())
if err != nil {
t.Fatal(err)
......@@ -314,16 +317,14 @@ func TestMessageResendAfterError(t *testing.T) {
t.Fatal(err)
}
<-r1.connectionEvent
// Return an error from the networking layer the next time we try to send
// a message
eh.setErrorState(true)
eh.setError(mockNetErr)
go func() {
time.Sleep(testSendErrorBackoff / 2)
// Stop throwing errors so that the following attempt to send succeeds
eh.setErrorState(false)
eh.setError(nil)
}()
// Send message with retries, first one should fail, then subsequent
......@@ -360,10 +361,7 @@ func TestMessageSendTimeout(t *testing.T) {
}
// Create a special host that we can force to start timing out
eh := &ErrHost{
Host: h1,
err: false,
}
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)
......@@ -381,6 +379,11 @@ func TestMessageSendTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
isConnected := <-r1.connectionEvent
if !isConnected {
t.Fatal("Expected connect event")
}
err = bsnet2.ConnectTo(ctx, p1.ID())
if err != nil {
t.Fatal(err)
......@@ -399,18 +402,98 @@ func TestMessageSendTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
<-r1.connectionEvent
// Return a DeadlineExceeded error from the networking layer the next time we try to
// send a message
eh.setTimeoutState(true)
// Send message with retries, first one should fail, then subsequent
// message should succeed
// Send message with retries, all attempts should fail
err = ms.SendMsg(ctx, msg)
if err == nil {
t.Fatal("Expected error from SednMsg")
}
select {
case <-time.After(500 * time.Millisecond):
t.Fatal("Did not receive disconnect event")
case isConnected = <-r1.connectionEvent:
if isConnected {
t.Fatal("Expected disconnect event (got connect event)")
}
}
}
func TestMessageSendNotSupportedResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create network
mn := mocknet.New(ctx)
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)
h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}
// Create a special host that responds with ErrNotSupported
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)
bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
r2 := newReceiver()
bsnet1.SetDelegate(r1)
bsnet2.SetDelegate(r2)
err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}
err = bsnet1.ConnectTo(ctx, p2.ID())
if err != nil {
t.Fatal(err)
}
isConnected := <-r1.connectionEvent
if !isConnected {
t.Fatal("Expected connect event")
}
err = bsnet2.ConnectTo(ctx, p1.ID())
if err != nil {
t.Fatal(err)
}
blockGenerator := blocksutil.NewBlockGenerator()
block1 := blockGenerator.Next()
msg := bsmsg.New(false)
msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true)
eh.setError(multistream.ErrNotSupported)
_, err = bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
MaxRetries: 3,
SendTimeout: 100 * time.Millisecond,
SendErrorBackoff: 100 * time.Millisecond,
})
if err == nil {
t.Fatal("Expected ErrNotSupported")
}
select {
case <-time.After(500 * time.Millisecond):
t.Fatal("Did not receive disconnect event")
case isConnected = <-r1.connectionEvent:
if isConnected {
t.Fatal("Expected disconnect event (got connect event)")
}
}
}
func TestSupportsHave(t *testing.T) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment