Unverified Commit 978eca5e authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #340 from libp2p/feat/fewer-goroutines

reduce background goroutines
parents 2d6dde4f b55fad15
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"time" "time"
ggio "github.com/gogo/protobuf/io" ggio "github.com/gogo/protobuf/io"
ctxio "github.com/jbenet/go-context/io"
"github.com/libp2p/go-libp2p-kad-dht/metrics" "github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb" pb "github.com/libp2p/go-libp2p-kad-dht/pb"
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
...@@ -19,6 +18,7 @@ import ( ...@@ -19,6 +18,7 @@ import (
) )
var dhtReadMessageTimeout = time.Minute var dhtReadMessageTimeout = time.Minute
var dhtStreamIdleTimeout = 10 * time.Minute
var ErrReadTimeout = fmt.Errorf("timed out reading response") var ErrReadTimeout = fmt.Errorf("timed out reading response")
// The Protobuf writer performs multiple small writes when writing a message. // The Protobuf writer performs multiple small writes when writing a message.
...@@ -67,12 +67,12 @@ func (dht *IpfsDHT) handleNewStream(s inet.Stream) { ...@@ -67,12 +67,12 @@ func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
// Returns true on orderly completion of writes (so we can Close the stream). // Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool { func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
ctx := dht.ctx ctx := dht.ctx
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
mPeer := s.Conn().RemotePeer() mPeer := s.Conn().RemotePeer()
timer := time.AfterFunc(dhtStreamIdleTimeout, func() { s.Reset() })
defer timer.Stop()
for { for {
var req pb.Message var req pb.Message
switch err := r.ReadMsg(&req); err { switch err := r.ReadMsg(&req); err {
...@@ -93,6 +93,8 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool { ...@@ -93,6 +93,8 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
case nil: case nil:
} }
timer.Reset(dhtStreamIdleTimeout)
startTime := time.Now() startTime := time.Now()
ctx, _ = tag.New( ctx, _ = tag.New(
ctx, ctx,
...@@ -126,7 +128,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool { ...@@ -126,7 +128,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
} }
// send out response msg // send out response msg
err = writeMsg(cw, resp) err = writeMsg(s, resp)
if err != nil { if err != nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1)) stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
logger.Debugf("error writing response: %v", err) logger.Debugf("error writing response: %v", err)
......
...@@ -84,8 +84,6 @@ github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec h1:DQqZhhDvrTrEQ3Q ...@@ -84,8 +84,6 @@ github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec h1:DQqZhhDvrTrEQ3Q
github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec/go.mod h1:rGaEvXB4uRSZMmzKNLoXvTu1sfx+1kv/DojUlPrSZGs= github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec/go.mod h1:rGaEvXB4uRSZMmzKNLoXvTu1sfx+1kv/DojUlPrSZGs=
github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2 h1:vhC1OXXiT9R2pczegwz6moDvuRpggaroAXhPIseh57A= github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2 h1:vhC1OXXiT9R2pczegwz6moDvuRpggaroAXhPIseh57A=
github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod h1:8GXXJV31xl8whumTzdZsTt3RnUIiPqzkyf7mxToRCMs= github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod h1:8GXXJV31xl8whumTzdZsTt3RnUIiPqzkyf7mxToRCMs=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw=
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment