dht_net.go 5.57 KB
Newer Older
1 2 3
package dht

import (
4
	"bufio"
5
	"fmt"
6
	"io"
7
	"sync"
8 9
	"time"

10
	"github.com/libp2p/go-libp2p-core/network"
11
	"github.com/libp2p/go-msgio/protoio"
12

13
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
14
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
15

16
	"github.com/libp2p/go-msgio"
17 18
	"go.opencensus.io/stats"
	"go.opencensus.io/tag"
19
	"go.uber.org/zap"
20 21
)

22
var dhtReadMessageTimeout = 10 * time.Second
23
var dhtStreamIdleTimeout = 1 * time.Minute
Alan Shaw's avatar
Alan Shaw committed
24

Alan Shaw's avatar
Alan Shaw committed
25
// ErrReadTimeout is an error that occurs when no message is read within the timeout period.
26 27
var ErrReadTimeout = fmt.Errorf("timed out reading response")

28 29 30 31 32
// The Protobuf writer performs multiple small writes when writing a message.
// We need to buffer those writes, to make sure that we're not sending a new
// packet for every single write.
type bufferedDelimitedWriter struct {
	*bufio.Writer
33
	protoio.WriteCloser
34 35
}

36 37 38 39 40
var writerPool = sync.Pool{
	New: func() interface{} {
		w := bufio.NewWriter(nil)
		return &bufferedDelimitedWriter{
			Writer:      w,
41
			WriteCloser: protoio.NewDelimitedWriter(w),
42 43 44 45 46 47 48 49 50 51
		}
	},
}

func writeMsg(w io.Writer, mes *pb.Message) error {
	bw := writerPool.Get().(*bufferedDelimitedWriter)
	bw.Reset(w)
	err := bw.WriteMsg(mes)
	if err == nil {
		err = bw.Flush()
52
	}
53 54 55
	bw.Reset(nil)
	writerPool.Put(bw)
	return err
56 57 58 59 60 61
}

func (w *bufferedDelimitedWriter) Flush() error {
	return w.Writer.Flush()
}

62 63
// handleNewStream implements the network.StreamHandler
func (dht *IpfsDHT) handleNewStream(s network.Stream) {
Matt Joiner's avatar
Matt Joiner committed
64
	if dht.handleNewMessage(s) {
65 66 67 68 69
		// If we exited without error, close gracefully.
		_ = s.Close()
	} else {
		// otherwise, send an error.
		_ = s.Reset()
Matt Joiner's avatar
Matt Joiner committed
70
	}
71 72
}

Matt Joiner's avatar
Matt Joiner committed
73
// Returns true on orderly completion of writes (so we can Close the stream).
74
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
75
	ctx := dht.ctx
Cole Brown's avatar
Cole Brown committed
76
	r := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
77

78 79
	mPeer := s.Conn().RemotePeer()

Steven Allen's avatar
Steven Allen committed
80
	timer := time.AfterFunc(dhtStreamIdleTimeout, func() { _ = s.Reset() })
81 82
	defer timer.Stop()

83
	for {
84 85 86 87 88
		if dht.getMode() != modeServer {
			logger.Errorf("ignoring incoming dht message while not in server mode")
			return false
		}

Matt Joiner's avatar
Matt Joiner committed
89
		var req pb.Message
90
		msgbytes, err := r.ReadMsg()
91
		msgLen := len(msgbytes)
92
		if err != nil {
93
			r.ReleaseMsg(msgbytes)
Cole Brown's avatar
Cole Brown committed
94 95 96 97 98
			if err == io.EOF {
				return true
			}
			// This string test is necessary because there isn't a single stream reset error
			// instance	in use.
99 100 101
			if c := baseLogger.Check(zap.DebugLevel, "error reading message"); c != nil && err.Error() != "stream reset" {
				c.Write(zap.String("from", mPeer.String()),
					zap.Error(err))
Cole Brown's avatar
Cole Brown committed
102
			}
103
			if msgLen > 0 {
Steven Allen's avatar
Steven Allen committed
104
				_ = stats.RecordWithTags(ctx,
105 106 107 108 109 110
					[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
					metrics.ReceivedMessages.M(1),
					metrics.ReceivedMessageErrors.M(1),
					metrics.ReceivedBytes.M(int64(msgLen)),
				)
			}
111 112
			return false
		}
Cole Brown's avatar
Cole Brown committed
113 114 115
		err = req.Unmarshal(msgbytes)
		r.ReleaseMsg(msgbytes)
		if err != nil {
116 117 118 119
			if c := baseLogger.Check(zap.DebugLevel, "error unmarshaling message"); c != nil {
				c.Write(zap.String("from", mPeer.String()),
					zap.Error(err))
			}
Steven Allen's avatar
Steven Allen committed
120
			_ = stats.RecordWithTags(ctx,
121
				[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
122
				metrics.ReceivedMessages.M(1),
123 124
				metrics.ReceivedMessageErrors.M(1),
				metrics.ReceivedBytes.M(int64(msgLen)),
125
			)
Matt Joiner's avatar
Matt Joiner committed
126
			return false
127 128
		}

129 130
		timer.Reset(dhtStreamIdleTimeout)

131
		startTime := time.Now()
132
		ctx, _ := tag.New(ctx,
133 134 135
			tag.Upsert(metrics.KeyMessageType, req.GetType().String()),
		)

136
		stats.Record(ctx,
137
			metrics.ReceivedMessages.M(1),
138
			metrics.ReceivedBytes.M(int64(msgLen)),
139 140
		)

Matt Joiner's avatar
Matt Joiner committed
141
		handler := dht.handlerForMsgType(req.GetType())
142
		if handler == nil {
143
			stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
144 145 146 147
			if c := baseLogger.Check(zap.DebugLevel, "can't handle received message"); c != nil {
				c.Write(zap.String("from", mPeer.String()),
					zap.Int32("type", int32(req.GetType())))
			}
Matt Joiner's avatar
Matt Joiner committed
148
			return false
149 150
		}

Aarsh Shah's avatar
Aarsh Shah committed
151 152 153
		// a peer has queried us, let's add it to RT
		dht.peerFound(dht.ctx, mPeer, true)

154 155 156 157 158
		if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
			c.Write(zap.String("from", mPeer.String()),
				zap.Int32("type", int32(req.GetType())),
				zap.Binary("key", req.GetKey()))
		}
Matt Joiner's avatar
Matt Joiner committed
159
		resp, err := handler(ctx, mPeer, &req)
160
		if err != nil {
161
			stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
162 163 164 165 166 167
			if c := baseLogger.Check(zap.DebugLevel, "error handling message"); c != nil {
				c.Write(zap.String("from", mPeer.String()),
					zap.Int32("type", int32(req.GetType())),
					zap.Binary("key", req.GetKey()),
					zap.Error(err))
			}
Matt Joiner's avatar
Matt Joiner committed
168
			return false
169 170
		}

171 172 173 174 175 176
		if c := baseLogger.Check(zap.DebugLevel, "handled message"); c != nil {
			c.Write(zap.String("from", mPeer.String()),
				zap.Int32("type", int32(req.GetType())),
				zap.Binary("key", req.GetKey()),
				zap.Duration("time", time.Since(startTime)))
		}
Steven Allen's avatar
Steven Allen committed
177

Matt Joiner's avatar
Matt Joiner committed
178
		if resp == nil {
179 180 181 182
			continue
		}

		// send out response msg
Steven Allen's avatar
Steven Allen committed
183
		err = writeMsg(s, resp)
184
		if err != nil {
185
			stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
186 187 188 189 190 191
			if c := baseLogger.Check(zap.DebugLevel, "error writing response"); c != nil {
				c.Write(zap.String("from", mPeer.String()),
					zap.Int32("type", int32(req.GetType())),
					zap.Binary("key", req.GetKey()),
					zap.Error(err))
			}
Matt Joiner's avatar
Matt Joiner committed
192
			return false
193
		}
Matt Joiner's avatar
Matt Joiner committed
194

195
		elapsedTime := time.Since(startTime)
Steven Allen's avatar
Steven Allen committed
196

197 198 199 200 201 202
		if c := baseLogger.Check(zap.DebugLevel, "responded to message"); c != nil {
			c.Write(zap.String("from", mPeer.String()),
				zap.Int32("type", int32(req.GetType())),
				zap.Binary("key", req.GetKey()),
				zap.Duration("time", elapsedTime))
		}
Steven Allen's avatar
Steven Allen committed
203

204 205
		latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
		stats.Record(ctx, metrics.InboundRequestLatency.M(latencyMillis))
206 207
	}
}