sessionpeermanager.go 9.69 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7
	"sort"
8
	"time"
9

10 11
	bssd "github.com/ipfs/go-bitswap/sessiondata"

12
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
13
	peer "github.com/libp2p/go-libp2p-core/peer"
14 15
)

hannahhoward's avatar
hannahhoward committed
16
const (
17 18 19 20
	defaultTimeoutDuration = 5 * time.Second
	maxOptimizedPeers      = 32
	unoptimizedTagValue    = 5  // tag value for "unoptimized" session peers.
	optimizedTagValue      = 10 // tag value for "optimized" session peers.
hannahhoward's avatar
hannahhoward committed
21 22
)

23 24 25 26 27 28 29 30
// PeerTagger is an interface for tagging peers with metadata
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

// PeerProviderFinder is an interface for finding providers
type PeerProviderFinder interface {
31
	FindProvidersAsync(context.Context, cid.Cid) <-chan peer.ID
32 33
}

hannahhoward's avatar
hannahhoward committed
34 35 36 37
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

38 39
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
40
type SessionPeerManager struct {
41 42 43 44 45
	ctx            context.Context
	tagger         PeerTagger
	providerFinder PeerProviderFinder
	tag            string
	id             uint64
46

hannahhoward's avatar
hannahhoward committed
47
	peerMessages chan peerMessage
48 49

	// do not touch outside of run loop
50
	activePeers         map[peer.ID]*peerData
hannahhoward's avatar
hannahhoward committed
51 52
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
53
	broadcastLatency    *latencyTracker
54
	timeoutDuration     time.Duration
55 56
}

57
// New creates a new SessionPeerManager
58
func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager {
59
	spm := &SessionPeerManager{
60
		ctx:              ctx,
Steven Allen's avatar
Steven Allen committed
61
		id:               id,
62 63
		tagger:           tagger,
		providerFinder:   providerFinder,
64 65 66
		peerMessages:     make(chan peerMessage, 16),
		activePeers:      make(map[peer.ID]*peerData),
		broadcastLatency: newLatencyTracker(),
67
		timeoutDuration:  defaultTimeoutDuration,
68 69 70 71 72 73 74 75
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

76 77 78
// RecordPeerResponse records that a peer received some blocks, and adds the
// peer to the list of peers if it wasn't already added
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, ks []cid.Cid) {
79

80
	select {
81
	case spm.peerMessages <- &peerResponseMessage{p, ks}:
82 83 84 85
	case <-spm.ctx.Done():
	}
}

86 87 88
// RecordCancels records the fact that cancellations were sent to peers,
// so if blocks don't arrive, don't let it affect the peer's timeout
func (spm *SessionPeerManager) RecordCancels(ks []cid.Cid) {
89
	select {
90
	case spm.peerMessages <- &cancelMessage{ks}:
91 92 93 94
	case <-spm.ctx.Done():
	}
}

95
// RecordPeerRequests records that a given set of peers requested the given cids.
96
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
97 98 99 100
	select {
	case spm.peerMessages <- &peerRequestMessage{p, ks}:
	case <-spm.ctx.Done():
	}
101 102
}

103 104 105
// GetOptimizedPeers returns the best peers available for a session, along with
// a rating for how good they are, in comparison to the best peer.
func (spm *SessionPeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
106 107
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
108
	resp := make(chan []bssd.OptimizedPeer, 1)
109
	select {
110
	case spm.peerMessages <- &getPeersMessage{resp}:
111 112 113 114 115 116 117 118 119 120 121 122
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

123 124
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
125 126
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
127 128
		for p := range spm.providerFinder.FindProvidersAsync(ctx, k) {

129 130 131 132 133
			select {
			case spm.peerMessages <- &peerFoundMessage{p}:
			case <-ctx.Done():
			case <-spm.ctx.Done():
			}
134 135 136 137
		}
	}(c)
}

138 139 140 141 142 143 144 145 146
// SetTimeoutDuration changes the length of time used to timeout recording of
// requests
func (spm *SessionPeerManager) SetTimeoutDuration(timeoutDuration time.Duration) {
	select {
	case spm.peerMessages <- &setTimeoutMessage{timeoutDuration}:
	case <-spm.ctx.Done():
	}
}

147 148 149
func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
150 151
		case pm := <-spm.peerMessages:
			pm.handle(spm)
152 153 154 155 156 157
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
158

159 160 161 162 163 164 165
func (spm *SessionPeerManager) tagPeer(p peer.ID, data *peerData) {
	var value int
	if data.hasLatency {
		value = optimizedTagValue
	} else {
		value = unoptimizedTagValue
	}
166
	spm.tagger.TagPeer(p, spm.tag, value)
hannahhoward's avatar
hannahhoward committed
167 168
}

169 170 171 172 173 174 175 176 177
func (spm *SessionPeerManager) insertPeer(p peer.ID, data *peerData) {
	if data.hasLatency {
		insertPos := sort.Search(len(spm.optimizedPeersArr), func(i int) bool {
			return spm.activePeers[spm.optimizedPeersArr[i]].latency > data.latency
		})
		spm.optimizedPeersArr = append(spm.optimizedPeersArr[:insertPos],
			append([]peer.ID{p}, spm.optimizedPeersArr[insertPos:]...)...)
	} else {
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
hannahhoward's avatar
hannahhoward committed
178 179 180
	}
}

181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

200
func (spm *SessionPeerManager) recordResponse(p peer.ID, ks []cid.Cid) {
201 202 203 204 205 206 207 208 209 210 211 212
	data, ok := spm.activePeers[p]
	wasOptimized := ok && data.hasLatency
	if wasOptimized {
		spm.removeOptimizedPeer(p)
	} else {
		if ok {
			spm.removeUnoptimizedPeer(p)
		} else {
			data = newPeerData()
			spm.activePeers[p] = data
		}
	}
213 214 215 216
	for _, k := range ks {
		fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
		data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
	}
217 218 219 220 221 222
	if !ok || wasOptimized != data.hasLatency {
		spm.tagPeer(p, data)
	}
	spm.insertPeer(p, data)
}

hannahhoward's avatar
hannahhoward committed
223 224 225 226 227 228
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
229
	if _, ok := spm.activePeers[p]; !ok {
230 231
		spm.activePeers[p] = newPeerData()
		spm.insertPeer(p, spm.activePeers[p])
232
		spm.tagPeer(p, spm.activePeers[p])
hannahhoward's avatar
hannahhoward committed
233 234 235 236
	}
}

type peerResponseMessage struct {
237 238
	p  peer.ID
	ks []cid.Cid
hannahhoward's avatar
hannahhoward committed
239 240 241
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
242
	spm.recordResponse(prm.p, prm.ks)
243 244 245 246 247 248 249 250 251
}

type peerRequestMessage struct {
	peers []peer.ID
	keys  []cid.Cid
}

func (spm *SessionPeerManager) makeTimeout(p peer.ID) afterTimeoutFunc {
	return func(k cid.Cid) {
252 253 254 255
		select {
		case spm.peerMessages <- &peerTimeoutMessage{p, k}:
		case <-spm.ctx.Done():
		}
256 257 258 259 260
	}
}

func (prm *peerRequestMessage) handle(spm *SessionPeerManager) {
	if prm.peers == nil {
261 262 263 264 265 266
		spm.broadcastLatency.SetupRequests(prm.keys, spm.timeoutDuration, func(k cid.Cid) {
			select {
			case spm.peerMessages <- &broadcastTimeoutMessage{k}:
			case <-spm.ctx.Done():
			}
		})
267 268 269
	} else {
		for _, p := range prm.peers {
			if data, ok := spm.activePeers[p]; ok {
270
				data.lt.SetupRequests(prm.keys, spm.timeoutDuration, spm.makeTimeout(p))
271 272 273
			}
		}
	}
hannahhoward's avatar
hannahhoward committed
274 275
}

276
type getPeersMessage struct {
277
	resp chan<- []bssd.OptimizedPeer
hannahhoward's avatar
hannahhoward committed
278 279
}

280 281
// Get all optimized peers in order followed by randomly ordered unoptimized
// peers, with a limit of maxOptimizedPeers
282
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
283 284
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))

285 286
	// Number of peers to get in total: unoptimized + optimized
	// limited by maxOptimizedPeers
hannahhoward's avatar
hannahhoward committed
287 288 289 290
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
291

292 293
	// The best peer latency is the first optimized peer's latency.
	// If we haven't recorded any peer's latency, use 0.
294 295 296 297 298 299
	var bestPeerLatency float64
	if len(spm.optimizedPeersArr) > 0 {
		bestPeerLatency = float64(spm.activePeers[spm.optimizedPeersArr[0]].latency)
	} else {
		bestPeerLatency = 0
	}
300

301
	optimizedPeers := make([]bssd.OptimizedPeer, 0, maxPeers)
302 303 304 305 306 307 308 309 310 311 312 313 314
	for i := 0; i < maxPeers; i++ {
		// First add optimized peers in order
		if i < len(spm.optimizedPeersArr) {
			p := spm.optimizedPeersArr[i]
			optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{
				Peer:               p,
				OptimizationRating: bestPeerLatency / float64(spm.activePeers[p].latency),
			})
		} else {
			// Then add unoptimized peers in random order
			p := spm.unoptimizedPeersArr[randomOrder[i-len(spm.optimizedPeersArr)]]
			optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: p, OptimizationRating: 0.0})
		}
315
	}
316
	prm.resp <- optimizedPeers
317 318
}

319
type cancelMessage struct {
320
	ks []cid.Cid
321 322 323 324
}

func (cm *cancelMessage) handle(spm *SessionPeerManager) {
	for _, data := range spm.activePeers {
325
		data.lt.RecordCancel(cm.ks)
326 327 328
	}
}

329
func (spm *SessionPeerManager) handleShutdown() {
330
	for p, data := range spm.activePeers {
331
		spm.tagger.UntagPeer(p, spm.tag)
332
		data.lt.Shutdown()
333 334
	}
}
335 336 337 338 339 340 341 342

type peerTimeoutMessage struct {
	p peer.ID
	k cid.Cid
}

func (ptm *peerTimeoutMessage) handle(spm *SessionPeerManager) {
	data, ok := spm.activePeers[ptm.p]
343 344 345 346 347 348
	// If the request was cancelled, make sure we clean up the request tracker
	if ok && data.lt.WasCancelled(ptm.k) {
		data.lt.RemoveRequest(ptm.k)
	} else {
		// If the request was not cancelled, record the latency. Note that we
		// do this even if we didn't previously know about this peer.
349
		spm.recordResponse(ptm.p, []cid.Cid{ptm.k})
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
	}
}

type broadcastTimeoutMessage struct {
	k cid.Cid
}

func (btm *broadcastTimeoutMessage) handle(spm *SessionPeerManager) {
	spm.broadcastLatency.RemoveRequest(btm.k)
}

type setTimeoutMessage struct {
	timeoutDuration time.Duration
}

func (stm *setTimeoutMessage) handle(spm *SessionPeerManager) {
	spm.timeoutDuration = stm.timeoutDuration
}