Commit da02d4a7 authored by Marten Seemann's avatar Marten Seemann

also use the buffered writer for handling new messages

parent 6d6ed078
......@@ -18,6 +18,31 @@ import (
var dhtReadMessageTimeout = time.Minute
var ErrReadTimeout = fmt.Errorf("timed out reading response")
type bufferedWriteCloser interface {
ggio.WriteCloser
Flush() error
}
// The Protobuf writer performs multiple small writes when writing a message.
// We need to buffer those writes, to make sure that we're not sending a new
// packet for every single write.
type bufferedDelimitedWriter struct {
*bufio.Writer
ggio.WriteCloser
}
func newBufferedDelimitedWriter(str io.Writer) bufferedWriteCloser {
w := bufio.NewWriter(str)
return &bufferedDelimitedWriter{
Writer: w,
WriteCloser: ggio.NewDelimitedWriter(w),
}
}
func (w *bufferedDelimitedWriter) Flush() error {
return w.Writer.Flush()
}
// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
go dht.handleNewMessage(s)
......@@ -28,7 +53,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(cw)
w := newBufferedDelimitedWriter(cw)
mPeer := s.Conn().RemotePeer()
for {
......@@ -71,7 +96,11 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
}
// send out response msg
if err := w.WriteMsg(rpmes); err != nil {
err = w.WriteMsg(rpmes)
if err == nil {
err = w.Flush()
}
if err != nil {
s.Reset()
log.Debugf("send response error: %s", err)
return
......@@ -158,31 +187,6 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
return ms, nil
}
type bufferedWriteCloser interface {
ggio.WriteCloser
Flush() error
}
// The Protobuf writer performs multiple small writes when writing a message.
// We need to buffer those writes, to make sure that we're not sending a new
// packet for every single write.
type bufferedDelimitedWriter struct {
*bufio.Writer
ggio.WriteCloser
}
func newBufferedDelimitedWriter(str inet.Stream) bufferedWriteCloser {
w := bufio.NewWriter(str)
return &bufferedDelimitedWriter{
Writer: w,
WriteCloser: ggio.NewDelimitedWriter(w),
}
}
func (w *bufferedDelimitedWriter) Flush() error {
return w.Writer.Flush()
}
type messageSender struct {
s inet.Stream
r ggio.ReadCloser
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment