dht_net.go 9.72 KB
Newer Older
1 2 3
package dht

import (
4
	"bufio"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"io"
8
	"sync"
9 10
	"time"

11 12 13 14
	"github.com/libp2p/go-libp2p-core/helpers"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"

15
	"github.com/libp2p/go-libp2p-kad-dht/metrics"
16
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
17 18 19

	ggio "github.com/gogo/protobuf/io"

20
	"github.com/libp2p/go-msgio"
21 22
	"go.opencensus.io/stats"
	"go.opencensus.io/tag"
23 24
)

25
var dhtReadMessageTimeout = time.Minute
26
var dhtStreamIdleTimeout = 10 * time.Minute
27 28
var ErrReadTimeout = fmt.Errorf("timed out reading response")

29 30 31 32 33 34 35 36
// The Protobuf writer performs multiple small writes when writing a message.
// We need to buffer those writes, to make sure that we're not sending a new
// packet for every single write.
type bufferedDelimitedWriter struct {
	*bufio.Writer
	ggio.WriteCloser
}

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
var writerPool = sync.Pool{
	New: func() interface{} {
		w := bufio.NewWriter(nil)
		return &bufferedDelimitedWriter{
			Writer:      w,
			WriteCloser: ggio.NewDelimitedWriter(w),
		}
	},
}

func writeMsg(w io.Writer, mes *pb.Message) error {
	bw := writerPool.Get().(*bufferedDelimitedWriter)
	bw.Reset(w)
	err := bw.WriteMsg(mes)
	if err == nil {
		err = bw.Flush()
53
	}
54 55 56
	bw.Reset(nil)
	writerPool.Put(bw)
	return err
57 58 59 60 61 62
}

func (w *bufferedDelimitedWriter) Flush() error {
	return w.Writer.Flush()
}

63 64
// handleNewStream implements the network.StreamHandler
func (dht *IpfsDHT) handleNewStream(s network.Stream) {
Matt Joiner's avatar
Matt Joiner committed
65 66 67 68 69
	defer s.Reset()
	if dht.handleNewMessage(s) {
		// Gracefully close the stream for writes.
		s.Close()
	}
70 71
}

Matt Joiner's avatar
Matt Joiner committed
72
// Returns true on orderly completion of writes (so we can Close the stream).
73
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
74
	ctx := dht.ctx
Cole Brown's avatar
Cole Brown committed
75
	r := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
76

77 78
	mPeer := s.Conn().RemotePeer()

79 80 81
	timer := time.AfterFunc(dhtStreamIdleTimeout, func() { s.Reset() })
	defer timer.Stop()

82
	for {
Matt Joiner's avatar
Matt Joiner committed
83
		var req pb.Message
84 85
		msgbytes, err := r.ReadMsg()
		if err != nil {
Cole Brown's avatar
Cole Brown committed
86 87 88 89 90 91 92 93 94
			defer r.ReleaseMsg(msgbytes)
			if err == io.EOF {
				return true
			}
			// This string test is necessary because there isn't a single stream reset error
			// instance	in use.
			if err.Error() != "stream reset" {
				logger.Debugf("error reading message: %#v", err)
			}
95 96 97 98 99 100 101
			stats.RecordWithTags(
				ctx,
				[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
				metrics.ReceivedMessageErrors.M(1),
			)
			return false
		}
Cole Brown's avatar
Cole Brown committed
102 103 104 105
		err = req.Unmarshal(msgbytes)
		r.ReleaseMsg(msgbytes)
		if err != nil {
			logger.Debugf("error unmarshalling message: %#v", err)
106 107 108 109 110
			stats.RecordWithTags(
				ctx,
				[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
				metrics.ReceivedMessageErrors.M(1),
			)
Matt Joiner's avatar
Matt Joiner committed
111
			return false
112 113
		}

114 115
		timer.Reset(dhtStreamIdleTimeout)

116 117 118 119 120 121 122 123 124 125 126 127
		startTime := time.Now()
		ctx, _ = tag.New(
			ctx,
			tag.Upsert(metrics.KeyMessageType, req.GetType().String()),
		)

		stats.Record(
			ctx,
			metrics.ReceivedMessages.M(1),
			metrics.ReceivedBytes.M(int64(req.Size())),
		)

Matt Joiner's avatar
Matt Joiner committed
128
		handler := dht.handlerForMsgType(req.GetType())
129
		if handler == nil {
130
			stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
Matt Joiner's avatar
Matt Joiner committed
131 132
			logger.Warningf("can't handle received message of type %v", req.GetType())
			return false
133 134
		}

Matt Joiner's avatar
Matt Joiner committed
135
		resp, err := handler(ctx, mPeer, &req)
136
		if err != nil {
137
			stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
Matt Joiner's avatar
Matt Joiner committed
138 139
			logger.Debugf("error handling message: %v", err)
			return false
140 141
		}

Matt Joiner's avatar
Matt Joiner committed
142 143 144
		dht.updateFromMessage(ctx, mPeer, &req)

		if resp == nil {
145 146 147 148
			continue
		}

		// send out response msg
Steven Allen's avatar
Steven Allen committed
149
		err = writeMsg(s, resp)
150
		if err != nil {
151
			stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
Matt Joiner's avatar
Matt Joiner committed
152 153
			logger.Debugf("error writing response: %v", err)
			return false
154
		}
Matt Joiner's avatar
Matt Joiner committed
155

156 157 158
		elapsedTime := time.Since(startTime)
		latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
		stats.Record(ctx, metrics.InboundRequestLatency.M(latencyMillis))
159 160 161 162 163
	}
}

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
164
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
165
	ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))
166

Steven Allen's avatar
Steven Allen committed
167
	ms, err := dht.messageSenderForPeer(ctx, p)
168
	if err != nil {
169
		stats.Record(ctx, metrics.SentRequestErrors.M(1))
170 171
		return nil, err
	}
172 173 174

	start := time.Now()

175 176
	rpmes, err := ms.SendRequest(ctx, pmes)
	if err != nil {
177
		stats.Record(ctx, metrics.SentRequestErrors.M(1))
178 179 180
		return nil, err
	}

181 182 183
	// update the peer (on valid msgs only)
	dht.updateFromMessage(ctx, p, rpmes)

184 185 186 187 188 189 190 191
	stats.Record(
		ctx,
		metrics.SentRequests.M(1),
		metrics.SentBytes.M(int64(pmes.Size())),
		metrics.OutboundRequestLatency.M(
			float64(time.Since(start))/float64(time.Millisecond),
		),
	)
192
	dht.peerstore.RecordLatency(p, time.Since(start))
Matt Joiner's avatar
Matt Joiner committed
193
	logger.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
194 195
	return rpmes, nil
}
196 197

// sendMessage sends out a message
198
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
199 200
	ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))

Steven Allen's avatar
Steven Allen committed
201
	ms, err := dht.messageSenderForPeer(ctx, p)
202
	if err != nil {
203
		stats.Record(ctx, metrics.SentMessageErrors.M(1))
204 205
		return err
	}
206

207
	if err := ms.SendMessage(ctx, pmes); err != nil {
208
		stats.Record(ctx, metrics.SentMessageErrors.M(1))
209 210
		return err
	}
211 212 213 214 215 216

	stats.Record(
		ctx,
		metrics.SentMessages.M(1),
		metrics.SentBytes.M(int64(pmes.Size())),
	)
Matt Joiner's avatar
Matt Joiner committed
217
	logger.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
218 219
	return nil
}
220 221

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
222 223 224 225 226
	// Make sure that this node is actually a DHT server, not just a client.
	protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
	if err == nil && len(protos) > 0 {
		dht.Update(ctx, p)
	}
227 228
	return nil
}
229

Steven Allen's avatar
Steven Allen committed
230
func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
231 232
	dht.smlk.Lock()
	ms, ok := dht.strmap[p]
233 234 235
	if ok {
		dht.smlk.Unlock()
		return ms, nil
236
	}
237 238 239 240
	ms = &messageSender{p: p, dht: dht}
	dht.strmap[p] = ms
	dht.smlk.Unlock()

Steven Allen's avatar
Steven Allen committed
241
	if err := ms.prepOrInvalidate(ctx); err != nil {
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
		dht.smlk.Lock()
		defer dht.smlk.Unlock()

		if msCur, ok := dht.strmap[p]; ok {
			// Changed. Use the new one, old one is invalid and
			// not in the map so we can just throw it away.
			if ms != msCur {
				return msCur, nil
			}
			// Not changed, remove the now invalid stream from the
			// map.
			delete(dht.strmap, p)
		}
		// Invalid but not in map. Must have been removed by a disconnect.
		return nil, err
	}
	// All ready to go.
	return ms, nil
260 261 262
}

type messageSender struct {
263
	s   network.Stream
264
	r   msgio.ReadCloser
265 266 267
	lk  sync.Mutex
	p   peer.ID
	dht *IpfsDHT
Jeromy's avatar
Jeromy committed
268

269
	invalid   bool
Jeromy's avatar
Jeromy committed
270
	singleMes int
271 272
}

Steven Allen's avatar
Steven Allen committed
273 274 275
// invalidate is called before this messageSender is removed from the strmap.
// It prevents the messageSender from being reused/reinitialized and then
// forgotten (leaving the stream open).
276 277 278 279 280 281 282 283
func (ms *messageSender) invalidate() {
	ms.invalid = true
	if ms.s != nil {
		ms.s.Reset()
		ms.s = nil
	}
}

Steven Allen's avatar
Steven Allen committed
284
func (ms *messageSender) prepOrInvalidate(ctx context.Context) error {
285 286
	ms.lk.Lock()
	defer ms.lk.Unlock()
Steven Allen's avatar
Steven Allen committed
287
	if err := ms.prep(ctx); err != nil {
288 289 290 291
		ms.invalidate()
		return err
	}
	return nil
292 293
}

Steven Allen's avatar
Steven Allen committed
294
func (ms *messageSender) prep(ctx context.Context) error {
295 296 297
	if ms.invalid {
		return fmt.Errorf("message sender has been invalidated")
	}
298 299 300 301
	if ms.s != nil {
		return nil
	}

Steven Allen's avatar
Steven Allen committed
302
	nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
303 304 305 306
	if err != nil {
		return err
	}

Cole Brown's avatar
Cole Brown committed
307
	ms.r = msgio.NewVarintReaderSize(nstr, network.MessageSizeMax)
308 309 310 311 312
	ms.s = nstr

	return nil
}

313 314 315 316
// streamReuseTries is the number of times we will try to reuse a stream to a
// given peer before giving up and reverting to the old one-message-per-stream
// behaviour.
const streamReuseTries = 3
317

318 319 320
func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
	ms.lk.Lock()
	defer ms.lk.Unlock()
321 322
	retry := false
	for {
Steven Allen's avatar
Steven Allen committed
323
		if err := ms.prep(ctx); err != nil {
Jeromy's avatar
Jeromy committed
324 325 326
			return err
		}

327
		if err := ms.writeMsg(pmes); err != nil {
328 329 330 331
			ms.s.Reset()
			ms.s = nil

			if retry {
Matt Joiner's avatar
Matt Joiner committed
332
				logger.Info("error writing message, bailing: ", err)
333 334
				return err
			}
335 336 337
			logger.Info("error writing message, trying again: ", err)
			retry = true
			continue
338 339
		}

Matt Joiner's avatar
Matt Joiner committed
340
		logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)
341 342

		if ms.singleMes > streamReuseTries {
343
			go helpers.FullClose(ms.s)
344 345 346
			ms.s = nil
		} else if retry {
			ms.singleMes++
Jeromy's avatar
Jeromy committed
347 348
		}

349
		return nil
350
	}
351
}
352 353 354 355

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
	ms.lk.Lock()
	defer ms.lk.Unlock()
356 357
	retry := false
	for {
Steven Allen's avatar
Steven Allen committed
358
		if err := ms.prep(ctx); err != nil {
359 360
			return nil, err
		}
361

362
		if err := ms.writeMsg(pmes); err != nil {
363 364 365 366
			ms.s.Reset()
			ms.s = nil

			if retry {
Matt Joiner's avatar
Matt Joiner committed
367
				logger.Info("error writing message, bailing: ", err)
368 369
				return nil, err
			}
370 371 372
			logger.Info("error writing message, trying again: ", err)
			retry = true
			continue
373
		}
374

375 376 377 378 379 380
		mes := new(pb.Message)
		if err := ms.ctxReadMsg(ctx, mes); err != nil {
			ms.s.Reset()
			ms.s = nil

			if retry {
Matt Joiner's avatar
Matt Joiner committed
381
				logger.Info("error reading message, bailing: ", err)
382 383
				return nil, err
			}
384 385 386
			logger.Info("error reading message, trying again: ", err)
			retry = true
			continue
387
		}
388

Matt Joiner's avatar
Matt Joiner committed
389
		logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)
390

391
		if ms.singleMes > streamReuseTries {
392
			go helpers.FullClose(ms.s)
393 394 395 396
			ms.s = nil
		} else if retry {
			ms.singleMes++
		}
Jeromy's avatar
Jeromy committed
397

398 399
		return mes, nil
	}
400
}
401

402
func (ms *messageSender) writeMsg(pmes *pb.Message) error {
403
	return writeMsg(ms.s, pmes)
404 405
}

406 407
func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
	errc := make(chan error, 1)
408 409
	go func(r msgio.ReadCloser) {
		bytes, err := r.ReadMsg()
Cole Brown's avatar
Cole Brown committed
410
		defer r.ReleaseMsg(bytes)
411 412 413 414 415
		if err != nil {
			errc <- err
			return
		}
		errc <- mes.Unmarshal(bytes)
416
	}(ms.r)
417

418 419 420
	t := time.NewTimer(dhtReadMessageTimeout)
	defer t.Stop()

421 422 423 424 425
	select {
	case err := <-errc:
		return err
	case <-ctx.Done():
		return ctx.Err()
426 427
	case <-t.C:
		return ErrReadTimeout
428 429
	}
}