sessionpeermanager.go 10.3 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7
	"sort"
8
	"time"
9

10
	bssd "github.com/ipfs/go-bitswap/internal/sessiondata"
dirkmc's avatar
dirkmc committed
11
	logging "github.com/ipfs/go-log"
12

13
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
14
	peer "github.com/libp2p/go-libp2p-core/peer"
15 16
)

dirkmc's avatar
dirkmc committed
17 18
var log = logging.Logger("bs:sprmgr")

hannahhoward's avatar
hannahhoward committed
19
const (
20 21 22 23
	defaultTimeoutDuration = 5 * time.Second
	maxOptimizedPeers      = 32
	unoptimizedTagValue    = 5  // tag value for "unoptimized" session peers.
	optimizedTagValue      = 10 // tag value for "optimized" session peers.
hannahhoward's avatar
hannahhoward committed
24 25
)

26 27 28 29 30 31 32 33
// PeerTagger is an interface for tagging peers with metadata
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

// PeerProviderFinder is an interface for finding providers
type PeerProviderFinder interface {
34
	FindProvidersAsync(context.Context, cid.Cid) <-chan peer.ID
35 36
}

hannahhoward's avatar
hannahhoward committed
37 38 39 40
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

41 42
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
43
type SessionPeerManager struct {
44 45 46
	ctx            context.Context
	tagger         PeerTagger
	providerFinder PeerProviderFinder
dirkmc's avatar
dirkmc committed
47
	peers          *peer.Set
48 49
	tag            string
	id             uint64
50

hannahhoward's avatar
hannahhoward committed
51
	peerMessages chan peerMessage
52 53

	// do not touch outside of run loop
54
	activePeers         map[peer.ID]*peerData
hannahhoward's avatar
hannahhoward committed
55 56
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
57
	broadcastLatency    *latencyTracker
58
	timeoutDuration     time.Duration
59 60
}

61
// New creates a new SessionPeerManager
62
func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager {
63
	spm := &SessionPeerManager{
64
		ctx:              ctx,
Steven Allen's avatar
Steven Allen committed
65
		id:               id,
66 67
		tagger:           tagger,
		providerFinder:   providerFinder,
dirkmc's avatar
dirkmc committed
68 69
		peers:            peer.NewSet(),
		peerMessages:     make(chan peerMessage, 128),
70 71
		activePeers:      make(map[peer.ID]*peerData),
		broadcastLatency: newLatencyTracker(),
72
		timeoutDuration:  defaultTimeoutDuration,
73 74 75 76 77 78 79 80
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

dirkmc's avatar
dirkmc committed
81 82 83 84 85 86 87 88 89 90 91 92 93
func (spm *SessionPeerManager) ReceiveFrom(p peer.ID, ks []cid.Cid, haves []cid.Cid) bool {
	if len(ks) > 0 || len(haves) > 0 && !spm.peers.Contains(p) {
		log.Infof("Added peer %s to session: %d peers\n", p, spm.peers.Size())
		spm.peers.Add(p)
		return true
	}
	return false
}

func (spm *SessionPeerManager) Peers() *peer.Set {
	return spm.peers
}

94 95 96
// RecordPeerResponse records that a peer received some blocks, and adds the
// peer to the list of peers if it wasn't already added
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, ks []cid.Cid) {
97

98
	select {
99
	case spm.peerMessages <- &peerResponseMessage{p, ks}:
100 101 102 103
	case <-spm.ctx.Done():
	}
}

104 105 106
// RecordCancels records the fact that cancellations were sent to peers,
// so if blocks don't arrive, don't let it affect the peer's timeout
func (spm *SessionPeerManager) RecordCancels(ks []cid.Cid) {
107
	select {
108
	case spm.peerMessages <- &cancelMessage{ks}:
109 110 111 112
	case <-spm.ctx.Done():
	}
}

113
// RecordPeerRequests records that a given set of peers requested the given cids.
114
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
115 116 117 118
	select {
	case spm.peerMessages <- &peerRequestMessage{p, ks}:
	case <-spm.ctx.Done():
	}
119 120
}

121 122 123
// GetOptimizedPeers returns the best peers available for a session, along with
// a rating for how good they are, in comparison to the best peer.
func (spm *SessionPeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
124 125
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
126
	resp := make(chan []bssd.OptimizedPeer, 1)
127
	select {
128
	case spm.peerMessages <- &getPeersMessage{resp}:
129 130 131 132 133 134 135 136 137 138 139 140
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

141 142
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
143 144
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
145 146
		for p := range spm.providerFinder.FindProvidersAsync(ctx, k) {

147 148 149 150 151
			select {
			case spm.peerMessages <- &peerFoundMessage{p}:
			case <-ctx.Done():
			case <-spm.ctx.Done():
			}
152 153 154 155
		}
	}(c)
}

156 157 158 159 160 161 162 163 164
// SetTimeoutDuration changes the length of time used to timeout recording of
// requests
func (spm *SessionPeerManager) SetTimeoutDuration(timeoutDuration time.Duration) {
	select {
	case spm.peerMessages <- &setTimeoutMessage{timeoutDuration}:
	case <-spm.ctx.Done():
	}
}

165 166 167
func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
168 169
		case pm := <-spm.peerMessages:
			pm.handle(spm)
170 171 172 173 174 175
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
176

177 178 179 180 181 182 183
func (spm *SessionPeerManager) tagPeer(p peer.ID, data *peerData) {
	var value int
	if data.hasLatency {
		value = optimizedTagValue
	} else {
		value = unoptimizedTagValue
	}
184
	spm.tagger.TagPeer(p, spm.tag, value)
hannahhoward's avatar
hannahhoward committed
185 186
}

187 188 189 190 191 192 193 194 195
func (spm *SessionPeerManager) insertPeer(p peer.ID, data *peerData) {
	if data.hasLatency {
		insertPos := sort.Search(len(spm.optimizedPeersArr), func(i int) bool {
			return spm.activePeers[spm.optimizedPeersArr[i]].latency > data.latency
		})
		spm.optimizedPeersArr = append(spm.optimizedPeersArr[:insertPos],
			append([]peer.ID{p}, spm.optimizedPeersArr[insertPos:]...)...)
	} else {
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
hannahhoward's avatar
hannahhoward committed
196
	}
dirkmc's avatar
dirkmc committed
197 198 199 200 201

	if !spm.peers.Contains(p) {
		log.Infof("Added peer %s to session: %d peers\n", p, spm.peers.Size())
		spm.peers.Add(p)
	}
hannahhoward's avatar
hannahhoward committed
202 203
}

204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

223
func (spm *SessionPeerManager) recordResponse(p peer.ID, ks []cid.Cid) {
224 225 226 227 228 229 230 231 232 233 234 235
	data, ok := spm.activePeers[p]
	wasOptimized := ok && data.hasLatency
	if wasOptimized {
		spm.removeOptimizedPeer(p)
	} else {
		if ok {
			spm.removeUnoptimizedPeer(p)
		} else {
			data = newPeerData()
			spm.activePeers[p] = data
		}
	}
236 237 238 239
	for _, k := range ks {
		fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
		data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
	}
240 241 242 243 244 245
	if !ok || wasOptimized != data.hasLatency {
		spm.tagPeer(p, data)
	}
	spm.insertPeer(p, data)
}

hannahhoward's avatar
hannahhoward committed
246 247 248 249 250 251
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
252
	if _, ok := spm.activePeers[p]; !ok {
253 254
		spm.activePeers[p] = newPeerData()
		spm.insertPeer(p, spm.activePeers[p])
255
		spm.tagPeer(p, spm.activePeers[p])
hannahhoward's avatar
hannahhoward committed
256 257 258 259
	}
}

type peerResponseMessage struct {
260 261
	p  peer.ID
	ks []cid.Cid
hannahhoward's avatar
hannahhoward committed
262 263 264
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
265
	spm.recordResponse(prm.p, prm.ks)
266 267 268 269 270 271 272 273 274
}

type peerRequestMessage struct {
	peers []peer.ID
	keys  []cid.Cid
}

func (spm *SessionPeerManager) makeTimeout(p peer.ID) afterTimeoutFunc {
	return func(k cid.Cid) {
275 276 277 278
		select {
		case spm.peerMessages <- &peerTimeoutMessage{p, k}:
		case <-spm.ctx.Done():
		}
279 280 281 282 283
	}
}

func (prm *peerRequestMessage) handle(spm *SessionPeerManager) {
	if prm.peers == nil {
284 285 286 287 288 289
		spm.broadcastLatency.SetupRequests(prm.keys, spm.timeoutDuration, func(k cid.Cid) {
			select {
			case spm.peerMessages <- &broadcastTimeoutMessage{k}:
			case <-spm.ctx.Done():
			}
		})
290 291 292
	} else {
		for _, p := range prm.peers {
			if data, ok := spm.activePeers[p]; ok {
293
				data.lt.SetupRequests(prm.keys, spm.timeoutDuration, spm.makeTimeout(p))
294 295 296
			}
		}
	}
hannahhoward's avatar
hannahhoward committed
297 298
}

299
type getPeersMessage struct {
300
	resp chan<- []bssd.OptimizedPeer
hannahhoward's avatar
hannahhoward committed
301 302
}

303 304
// Get all optimized peers in order followed by randomly ordered unoptimized
// peers, with a limit of maxOptimizedPeers
305
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
306 307
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))

308 309
	// Number of peers to get in total: unoptimized + optimized
	// limited by maxOptimizedPeers
hannahhoward's avatar
hannahhoward committed
310 311 312 313
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
314

315 316
	// The best peer latency is the first optimized peer's latency.
	// If we haven't recorded any peer's latency, use 0.
317 318 319 320 321 322
	var bestPeerLatency float64
	if len(spm.optimizedPeersArr) > 0 {
		bestPeerLatency = float64(spm.activePeers[spm.optimizedPeersArr[0]].latency)
	} else {
		bestPeerLatency = 0
	}
323

324
	optimizedPeers := make([]bssd.OptimizedPeer, 0, maxPeers)
325 326 327 328 329 330 331 332 333 334 335 336 337
	for i := 0; i < maxPeers; i++ {
		// First add optimized peers in order
		if i < len(spm.optimizedPeersArr) {
			p := spm.optimizedPeersArr[i]
			optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{
				Peer:               p,
				OptimizationRating: bestPeerLatency / float64(spm.activePeers[p].latency),
			})
		} else {
			// Then add unoptimized peers in random order
			p := spm.unoptimizedPeersArr[randomOrder[i-len(spm.optimizedPeersArr)]]
			optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: p, OptimizationRating: 0.0})
		}
338
	}
339
	prm.resp <- optimizedPeers
340 341
}

342
type cancelMessage struct {
343
	ks []cid.Cid
344 345 346 347
}

func (cm *cancelMessage) handle(spm *SessionPeerManager) {
	for _, data := range spm.activePeers {
348
		data.lt.RecordCancel(cm.ks)
349 350 351
	}
}

352
func (spm *SessionPeerManager) handleShutdown() {
353
	for p, data := range spm.activePeers {
354
		spm.tagger.UntagPeer(p, spm.tag)
355
		data.lt.Shutdown()
356 357
	}
}
358 359 360 361 362 363 364 365

type peerTimeoutMessage struct {
	p peer.ID
	k cid.Cid
}

func (ptm *peerTimeoutMessage) handle(spm *SessionPeerManager) {
	data, ok := spm.activePeers[ptm.p]
366 367 368 369 370 371
	// If the request was cancelled, make sure we clean up the request tracker
	if ok && data.lt.WasCancelled(ptm.k) {
		data.lt.RemoveRequest(ptm.k)
	} else {
		// If the request was not cancelled, record the latency. Note that we
		// do this even if we didn't previously know about this peer.
372
		spm.recordResponse(ptm.p, []cid.Cid{ptm.k})
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
	}
}

type broadcastTimeoutMessage struct {
	k cid.Cid
}

func (btm *broadcastTimeoutMessage) handle(spm *SessionPeerManager) {
	spm.broadcastLatency.RemoveRequest(btm.k)
}

type setTimeoutMessage struct {
	timeoutDuration time.Duration
}

func (stm *setTimeoutMessage) handle(spm *SessionPeerManager) {
	spm.timeoutDuration = stm.timeoutDuration
}