donthavetimeoutmgr.go 11.1 KB
Newer Older
1 2 3 4 5 6 7
package messagequeue

import (
	"context"
	"sync"
	"time"

8 9
	cid "gitlab.dms3.io/dms3/go-cid"
	"gitlab.dms3.io/p2p/go-p2p/p2p/protocol/ping"
10 11 12 13
)

const (
	// dontHaveTimeout is used to simulate a DONT_HAVE when communicating with
14 15
	// a peer whose Bitswap client doesn't support the DONT_HAVE response,
	// or when the peer takes too long to respond.
16 17 18 19 20 21
	// If the peer doesn't respond to a want-block within the timeout, the
	// local node assumes that the peer doesn't have the block.
	dontHaveTimeout = 5 * time.Second

	// maxExpectedWantProcessTime is the maximum amount of time we expect a
	// peer takes to process a want and initiate sending a response to us
22
	maxExpectedWantProcessTime = 2 * time.Second
23

24 25 26 27
	// maxTimeout is the maximum allowed timeout, regardless of latency
	maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime

	// pingLatencyMultiplier is multiplied by the average ping time to
28 29
	// get an upper bound on how long we expect to wait for a peer's response
	// to arrive
30 31 32 33 34 35 36 37
	pingLatencyMultiplier = 3

	// messageLatencyAlpha is the alpha supplied to the message latency EWMA
	messageLatencyAlpha = 0.5

	// To give a margin for error, the timeout is calculated as
	// messageLatencyMultiplier * message latency
	messageLatencyMultiplier = 2
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
)

// PeerConnection is a connection to a peer that can be pinged, and the
// average latency measured
type PeerConnection interface {
	// Ping the peer
	Ping(context.Context) ping.Result
	// The average latency of all pings
	Latency() time.Duration
}

// pendingWant keeps track of a want that has been sent and we're waiting
// for a response or for a timeout to expire
type pendingWant struct {
	c      cid.Cid
	active bool
	sent   time.Time
}

57 58 59 60 61
// dontHaveTimeoutMgr simulates a DONT_HAVE message if the peer takes too long
// to respond to a message.
// The timeout is based on latency - we start with a default latency, while
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
62 63 64 65 66 67
type dontHaveTimeoutMgr struct {
	ctx                        context.Context
	shutdown                   func()
	peerConn                   PeerConnection
	onDontHaveTimeout          func([]cid.Cid)
	defaultTimeout             time.Duration
68 69 70
	maxTimeout                 time.Duration
	pingLatencyMultiplier      int
	messageLatencyMultiplier   int
71 72 73 74 75 76 77 78 79 80 81 82
	maxExpectedWantProcessTime time.Duration

	// All variables below here must be protected by the lock
	lk sync.RWMutex
	// has the timeout manager started
	started bool
	// wants that are active (waiting for a response or timeout)
	activeWants map[cid.Cid]*pendingWant
	// queue of wants, from oldest to newest
	wantQueue []*pendingWant
	// time to wait for a response (depends on latency)
	timeout time.Duration
83 84
	// ewma of message latency (time from message sent to response received)
	messageLatency *latencyEwma
85 86 87 88 89 90
	// timer used to wait until want at front of queue expires
	checkForTimeoutsTimer *time.Timer
}

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
91
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr {
92 93
	return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
		pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime)
94 95 96
}

// newDontHaveTimeoutMgrWithParams is used by the tests
97 98 99 100 101 102 103
func newDontHaveTimeoutMgrWithParams(
	pc PeerConnection,
	onDontHaveTimeout func([]cid.Cid),
	defaultTimeout time.Duration,
	maxTimeout time.Duration,
	pingLatencyMultiplier int,
	messageLatencyMultiplier int,
104 105
	maxExpectedWantProcessTime time.Duration) *dontHaveTimeoutMgr {

106
	ctx, shutdown := context.WithCancel(context.Background())
107 108 109 110 111 112
	mqp := &dontHaveTimeoutMgr{
		ctx:                        ctx,
		shutdown:                   shutdown,
		peerConn:                   pc,
		activeWants:                make(map[cid.Cid]*pendingWant),
		timeout:                    defaultTimeout,
113
		messageLatency:             &latencyEwma{alpha: messageLatencyAlpha},
114
		defaultTimeout:             defaultTimeout,
115 116 117
		maxTimeout:                 maxTimeout,
		pingLatencyMultiplier:      pingLatencyMultiplier,
		messageLatencyMultiplier:   messageLatencyMultiplier,
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
		maxExpectedWantProcessTime: maxExpectedWantProcessTime,
		onDontHaveTimeout:          onDontHaveTimeout,
	}

	return mqp
}

// Shutdown the dontHaveTimeoutMgr. Any subsequent call to Start() will be ignored
func (dhtm *dontHaveTimeoutMgr) Shutdown() {
	dhtm.shutdown()

	dhtm.lk.Lock()
	defer dhtm.lk.Unlock()

	// Clear any pending check for timeouts
	if dhtm.checkForTimeoutsTimer != nil {
		dhtm.checkForTimeoutsTimer.Stop()
	}
}

// Start the dontHaveTimeoutMgr. This method is idempotent
func (dhtm *dontHaveTimeoutMgr) Start() {
	dhtm.lk.Lock()
	defer dhtm.lk.Unlock()

	// Make sure the dont have timeout manager hasn't already been started
	if dhtm.started {
		return
	}
	dhtm.started = true

	// If we already have a measure of latency to the peer, use it to
	// calculate a reasonable timeout
	latency := dhtm.peerConn.Latency()
	if latency.Nanoseconds() > 0 {
153
		dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)
154 155 156 157
		return
	}

	// Otherwise measure latency by pinging the peer
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
	go dhtm.measurePingLatency()
}

// UpdateMessageLatency is called when we receive a response from the peer.
// It is the time between sending a request and receiving the corresponding
// response.
func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
	dhtm.lk.Lock()
	defer dhtm.lk.Unlock()

	// Update the message latency and the timeout
	dhtm.messageLatency.update(elapsed)
	oldTimeout := dhtm.timeout
	dhtm.timeout = dhtm.calculateTimeoutFromMessageLatency()

	// If the timeout has decreased
	if dhtm.timeout < oldTimeout {
		// Check if after changing the timeout there are any pending wants that
		// are now over the timeout
		dhtm.checkForTimeouts()
	}
179 180
}

181 182
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
	// Wait up to defaultTimeout for a response to the ping
	ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
	defer cancel()

	// Ping the peer
	res := dhtm.peerConn.Ping(ctx)
	if res.Error != nil {
		// If there was an error, we'll just leave the timeout as
		// defaultTimeout
		return
	}

	// Get the average latency to the peer
	latency := dhtm.peerConn.Latency()

	dhtm.lk.Lock()
	defer dhtm.lk.Unlock()

201 202 203 204 205
	// A message has arrived so we already set the timeout based on message latency
	if dhtm.messageLatency.samples > 0 {
		return
	}

206
	// Calculate a reasonable timeout based on latency
207
	dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335

	// Check if after changing the timeout there are any pending wants that are
	// now over the timeout
	dhtm.checkForTimeouts()
}

// checkForTimeouts checks pending wants to see if any are over the timeout.
// Note: this function should only be called within the lock.
func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {
	if len(dhtm.wantQueue) == 0 {
		return
	}

	// Figure out which of the blocks that were wanted were not received
	// within the timeout
	expired := make([]cid.Cid, 0, len(dhtm.activeWants))
	for len(dhtm.wantQueue) > 0 {
		pw := dhtm.wantQueue[0]

		// If the want is still active
		if pw.active {
			// The queue is in order from earliest to latest, so if we
			// didn't find an expired entry we can stop iterating
			if time.Since(pw.sent) < dhtm.timeout {
				break
			}

			// Add the want to the expired list
			expired = append(expired, pw.c)
			// Remove the want from the activeWants map
			delete(dhtm.activeWants, pw.c)
		}

		// Remove expired or cancelled wants from the want queue
		dhtm.wantQueue = dhtm.wantQueue[1:]
	}

	// Fire the timeout event for the expired wants
	if len(expired) > 0 {
		go dhtm.fireTimeout(expired)
	}

	if len(dhtm.wantQueue) == 0 {
		return
	}

	// Make sure the timeout manager is still running
	if dhtm.ctx.Err() != nil {
		return
	}

	// Schedule the next check for the moment when the oldest pending want will
	// timeout
	oldestStart := dhtm.wantQueue[0].sent
	until := time.Until(oldestStart.Add(dhtm.timeout))
	if dhtm.checkForTimeoutsTimer == nil {
		dhtm.checkForTimeoutsTimer = time.AfterFunc(until, func() {
			dhtm.lk.Lock()
			defer dhtm.lk.Unlock()

			dhtm.checkForTimeouts()
		})
	} else {
		dhtm.checkForTimeoutsTimer.Stop()
		dhtm.checkForTimeoutsTimer.Reset(until)
	}
}

// AddPending adds the given keys that will expire if not cancelled before
// the timeout
func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) {
	if len(ks) == 0 {
		return
	}

	start := time.Now()

	dhtm.lk.Lock()
	defer dhtm.lk.Unlock()

	queueWasEmpty := len(dhtm.activeWants) == 0

	// Record the start time for each key
	for _, c := range ks {
		if _, ok := dhtm.activeWants[c]; !ok {
			pw := pendingWant{
				c:      c,
				sent:   start,
				active: true,
			}
			dhtm.activeWants[c] = &pw
			dhtm.wantQueue = append(dhtm.wantQueue, &pw)
		}
	}

	// If there was already an earlier pending item in the queue, then there
	// must already be a timeout check scheduled. If there is nothing in the
	// queue then we should make sure to schedule a check.
	if queueWasEmpty {
		dhtm.checkForTimeouts()
	}
}

// CancelPending is called when we receive a response for a key
func (dhtm *dontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
	dhtm.lk.Lock()
	defer dhtm.lk.Unlock()

	// Mark the wants as cancelled
	for _, c := range ks {
		if pw, ok := dhtm.activeWants[c]; ok {
			pw.active = false
			delete(dhtm.activeWants, c)
		}
	}
}

// fireTimeout fires the onDontHaveTimeout method with the timed out keys
func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
	// Make sure the timeout manager has not been shut down
	if dhtm.ctx.Err() != nil {
		return
	}

	// Fire the timeout
	dhtm.onDontHaveTimeout(pending)
}

336 337
// calculateTimeoutFromPingLatency calculates a reasonable timeout derived from latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Duration) time.Duration {
338 339 340
	// The maximum expected time for a response is
	// the expected time to process the want + (latency * multiplier)
	// The multiplier is to provide some padding for variable latency.
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
	timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
	if timeout > dhtm.maxTimeout {
		timeout = dhtm.maxTimeout
	}
	return timeout
}

// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
	timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
	if timeout > dhtm.maxTimeout {
		timeout = dhtm.maxTimeout
	}
	return timeout
}

// latencyEwma is an EWMA of message latency
type latencyEwma struct {
	alpha   float64
	samples uint64
	latency time.Duration
}

// update the EWMA with the given sample
func (le *latencyEwma) update(elapsed time.Duration) {
	le.samples++

	// Initially set alpha to be 1.0 / <the number of samples>
	alpha := 1.0 / float64(le.samples)
	if alpha < le.alpha {
		// Once we have enough samples, clamp alpha
		alpha = le.alpha
	}
	le.latency = time.Duration(float64(elapsed)*alpha + (1-alpha)*float64(le.latency))
375
}