Unverified Commit 874e3d3f authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #353 from libp2p/feat/message-size

Feat/message size
parents 31765355 855b46d3
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
ggio "github.com/gogo/protobuf/io" ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-msgio"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/tag" "go.opencensus.io/tag"
) )
...@@ -71,7 +72,7 @@ func (dht *IpfsDHT) handleNewStream(s network.Stream) { ...@@ -71,7 +72,7 @@ func (dht *IpfsDHT) handleNewStream(s network.Stream) {
// Returns true on orderly completion of writes (so we can Close the stream). // Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
ctx := dht.ctx ctx := dht.ctx
r := ggio.NewDelimitedReader(s, network.MessageSizeMax) r := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
mPeer := s.Conn().RemotePeer() mPeer := s.Conn().RemotePeer()
...@@ -80,10 +81,12 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { ...@@ -80,10 +81,12 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
for { for {
var req pb.Message var req pb.Message
switch err := r.ReadMsg(&req); err { msgbytes, err := r.ReadMsg()
case io.EOF: if err != nil {
return true defer r.ReleaseMsg(msgbytes)
default: if err == io.EOF {
return true
}
// This string test is necessary because there isn't a single stream reset error // This string test is necessary because there isn't a single stream reset error
// instance in use. // instance in use.
if err.Error() != "stream reset" { if err.Error() != "stream reset" {
...@@ -95,7 +98,17 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { ...@@ -95,7 +98,17 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
metrics.ReceivedMessageErrors.M(1), metrics.ReceivedMessageErrors.M(1),
) )
return false return false
case nil: }
err = req.Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
logger.Debugf("error unmarshalling message: %#v", err)
stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessageErrors.M(1),
)
return false
} }
timer.Reset(dhtStreamIdleTimeout) timer.Reset(dhtStreamIdleTimeout)
...@@ -248,7 +261,7 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa ...@@ -248,7 +261,7 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa
type messageSender struct { type messageSender struct {
s network.Stream s network.Stream
r ggio.ReadCloser r msgio.ReadCloser
lk sync.Mutex lk sync.Mutex
p peer.ID p peer.ID
dht *IpfsDHT dht *IpfsDHT
...@@ -291,7 +304,7 @@ func (ms *messageSender) prep(ctx context.Context) error { ...@@ -291,7 +304,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
return err return err
} }
ms.r = ggio.NewDelimitedReader(nstr, network.MessageSizeMax) ms.r = msgio.NewVarintReaderSize(nstr, network.MessageSizeMax)
ms.s = nstr ms.s = nstr
return nil return nil
...@@ -392,8 +405,14 @@ func (ms *messageSender) writeMsg(pmes *pb.Message) error { ...@@ -392,8 +405,14 @@ func (ms *messageSender) writeMsg(pmes *pb.Message) error {
func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error { func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
errc := make(chan error, 1) errc := make(chan error, 1)
go func(r ggio.ReadCloser) { go func(r msgio.ReadCloser) {
errc <- r.ReadMsg(mes) bytes, err := r.ReadMsg()
defer r.ReleaseMsg(bytes)
if err != nil {
errc <- err
return
}
errc <- mes.Unmarshal(bytes)
}(ms.r) }(ms.r)
t := time.NewTimer(dhtReadMessageTimeout) t := time.NewTimer(dhtReadMessageTimeout)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment