Unverified Commit 1c850e12 authored by Aarsh Shah's avatar Aarsh Shah Committed by GitHub

Timeout all Identify stream reads (#1032)

* fix: read timeouts on Identify protocols

* fixed tests

* review and go fmt
parent 577e7521
...@@ -46,6 +46,9 @@ const LibP2PVersion = "ipfs/0.1.0" ...@@ -46,6 +46,9 @@ const LibP2PVersion = "ipfs/0.1.0"
// Deprecated: Set this with the UserAgent option. // Deprecated: Set this with the UserAgent option.
var ClientVersion = "github.com/libp2p/go-libp2p" var ClientVersion = "github.com/libp2p/go-libp2p"
// StreamReadTimeout is the read timeout on all incoming Identify family streams.
var StreamReadTimeout = 60 * time.Second
var ( var (
legacyIDSize = 2 * 1024 // 2k Bytes legacyIDSize = 2 * 1024 // 2k Bytes
signedIDSize = 8 * 1024 // 8K signedIDSize = 8 * 1024 // 8K
...@@ -369,7 +372,8 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) { ...@@ -369,7 +372,8 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) {
s.Reset() s.Reset()
return return
} }
ids.handleIdentifyResponse(s)
err = ids.handleIdentifyResponse(s)
} }
func (ids *IDService) sendIdentifyResp(s network.Stream) { func (ids *IDService) sendIdentifyResp(s network.Stream) {
...@@ -408,7 +412,9 @@ func (ids *IDService) sendIdentifyResp(s network.Stream) { ...@@ -408,7 +412,9 @@ func (ids *IDService) sendIdentifyResp(s network.Stream) {
log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr()) log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr())
} }
func (ids *IDService) handleIdentifyResponse(s network.Stream) { func (ids *IDService) handleIdentifyResponse(s network.Stream) error {
_ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout))
c := s.Conn() c := s.Conn()
r := protoio.NewDelimitedReader(s, signedIDSize) r := protoio.NewDelimitedReader(s, signedIDSize)
...@@ -417,7 +423,7 @@ func (ids *IDService) handleIdentifyResponse(s network.Stream) { ...@@ -417,7 +423,7 @@ func (ids *IDService) handleIdentifyResponse(s network.Stream) {
if err := readAllIDMessages(r, mes); err != nil { if err := readAllIDMessages(r, mes); err != nil {
log.Warning("error reading identify message: ", err) log.Warning("error reading identify message: ", err)
s.Reset() s.Reset()
return return err
} }
defer s.Close() defer s.Close()
...@@ -425,6 +431,8 @@ func (ids *IDService) handleIdentifyResponse(s network.Stream) { ...@@ -425,6 +431,8 @@ func (ids *IDService) handleIdentifyResponse(s network.Stream) {
log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr()) log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr())
ids.consumeMessage(mes, c) ids.consumeMessage(mes, c)
return nil
} }
func readAllIDMessages(r protoio.Reader, finalMsg proto.Message) error { func readAllIDMessages(r protoio.Reader, finalMsg proto.Message) error {
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
"time"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
...@@ -15,6 +16,8 @@ const IDDelta = "/p2p/id/delta/1.0.0" ...@@ -15,6 +16,8 @@ const IDDelta = "/p2p/id/delta/1.0.0"
// deltaHandler handles incoming delta updates from peers. // deltaHandler handles incoming delta updates from peers.
func (ids *IDService) deltaHandler(s network.Stream) { func (ids *IDService) deltaHandler(s network.Stream) {
_ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout))
c := s.Conn() c := s.Conn()
r := protoio.NewDelimitedReader(s, 2048) r := protoio.NewDelimitedReader(s, 2048)
......
...@@ -978,3 +978,83 @@ func TestLargePushMessage(t *testing.T) { ...@@ -978,3 +978,83 @@ func TestLargePushMessage(t *testing.T) {
}, 5*time.Second, 500*time.Millisecond) }, 5*time.Second, 500*time.Millisecond)
testHasCertifiedAddrs(t, h2, h1p, h1.Addrs()) testHasCertifiedAddrs(t, h2, h1p, h1.Addrs())
} }
func TestIdentifyResponseReadTimeout(t *testing.T) {
ctx := context.Background()
timeout := identify.StreamReadTimeout
identify.StreamReadTimeout = 100 * time.Millisecond
defer func() {
identify.StreamReadTimeout = timeout
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
defer h1.Close()
defer h2.Close()
h2p := h2.ID()
ids1 := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
defer ids1.Close()
defer ids2.Close()
// remote stream handler will just hang and not send back an identify response
h2.SetStreamHandler(identify.ID, func(s network.Stream) {
time.Sleep(100 * time.Second)
})
sub, err := ids1.Host.EventBus().Subscribe(new(event.EvtPeerIdentificationFailed), eventbus.BufSize(16))
require.NoError(t, err)
h2pi := h2.Peerstore().PeerInfo(h2p)
require.NoError(t, h1.Connect(ctx, h2pi))
select {
case ev := <-sub.Out():
fev := ev.(event.EvtPeerIdentificationFailed)
require.EqualError(t, fev.Reason, "i/o deadline reached")
case <-time.After(5 * time.Second):
t.Fatal("did not receive identify failure event")
}
}
func TestIncomingIDStreamsTimeout(t *testing.T) {
ctx := context.Background()
timeout := identify.StreamReadTimeout
identify.StreamReadTimeout = 100 * time.Millisecond
defer func() {
identify.StreamReadTimeout = timeout
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
protocols := []protocol.ID{identify.IDPush, identify.IDDelta}
for _, p := range protocols {
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
defer h1.Close()
defer h2.Close()
ids1 := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
defer ids1.Close()
defer ids2.Close()
h2p := h2.ID()
h2pi := h2.Peerstore().PeerInfo(h2p)
require.NoError(t, h1.Connect(ctx, h2pi))
_, err := h1.NewStream(ctx, h2p, p)
require.NoError(t, err)
// remote peer should eventually reset stream
require.Eventually(t, func() bool {
c := h2.Network().ConnsToPeer(h1.ID())[0]
return len(c.GetStreams()) == 0
}, 1*time.Second, 200*time.Millisecond)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment