sessionpeermanager.go 9.04 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7
	"sort"
8
	"time"
9

10 11
	bssd "github.com/ipfs/go-bitswap/sessiondata"

12
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
13
	peer "github.com/libp2p/go-libp2p-core/peer"
14 15
)

hannahhoward's avatar
hannahhoward committed
16
const (
17 18 19 20
	defaultTimeoutDuration = 5 * time.Second
	maxOptimizedPeers      = 32
	unoptimizedTagValue    = 5  // tag value for "unoptimized" session peers.
	optimizedTagValue      = 10 // tag value for "optimized" session peers.
hannahhoward's avatar
hannahhoward committed
21 22
)

23 24 25 26 27 28 29 30
// PeerTagger is an interface for tagging peers with metadata
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

// PeerProviderFinder is an interface for finding providers
type PeerProviderFinder interface {
31
	FindProvidersAsync(context.Context, cid.Cid) <-chan peer.ID
32 33
}

hannahhoward's avatar
hannahhoward committed
34 35 36 37
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

38 39
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
40
type SessionPeerManager struct {
41 42 43 44 45
	ctx            context.Context
	tagger         PeerTagger
	providerFinder PeerProviderFinder
	tag            string
	id             uint64
46

hannahhoward's avatar
hannahhoward committed
47
	peerMessages chan peerMessage
48 49

	// do not touch outside of run loop
50
	activePeers         map[peer.ID]*peerData
hannahhoward's avatar
hannahhoward committed
51 52
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
53
	broadcastLatency    *latencyTracker
54
	timeoutDuration     time.Duration
55 56
}

57
// New creates a new SessionPeerManager
58
func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager {
59
	spm := &SessionPeerManager{
60
		ctx:              ctx,
61 62
		tagger:           tagger,
		providerFinder:   providerFinder,
63 64 65
		peerMessages:     make(chan peerMessage, 16),
		activePeers:      make(map[peer.ID]*peerData),
		broadcastLatency: newLatencyTracker(),
66
		timeoutDuration:  defaultTimeoutDuration,
67 68 69 70 71 72 73 74
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

75 76
// RecordPeerResponse records that a peer received a block, and adds to it
// the list of peers if it wasn't already added
77
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
78

79 80 81 82 83 84 85 86 87
	select {
	case spm.peerMessages <- &peerResponseMessage{p, k}:
	case <-spm.ctx.Done():
	}
}

// RecordCancel records the fact that cancellations were sent to peers,
// so if not blocks come in, don't let it affect peers timeout
func (spm *SessionPeerManager) RecordCancel(k cid.Cid) {
88 89 90
	// at the moment, we're just adding peers here
	// in the future, we'll actually use this to record metrics
	select {
91
	case spm.peerMessages <- &cancelMessage{k}:
92 93 94 95
	case <-spm.ctx.Done():
	}
}

96
// RecordPeerRequests records that a given set of peers requested the given cids.
97
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
98 99 100 101
	select {
	case spm.peerMessages <- &peerRequestMessage{p, ks}:
	case <-spm.ctx.Done():
	}
102 103
}

104 105 106
// GetOptimizedPeers returns the best peers available for a session, along with
// a rating for how good they are, in comparison to the best peer.
func (spm *SessionPeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
107 108
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
109
	resp := make(chan []bssd.OptimizedPeer, 1)
110
	select {
111
	case spm.peerMessages <- &getPeersMessage{resp}:
112 113 114 115 116 117 118 119 120 121 122 123
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

124 125
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
126 127
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
128 129
		for p := range spm.providerFinder.FindProvidersAsync(ctx, k) {

130 131 132 133 134
			select {
			case spm.peerMessages <- &peerFoundMessage{p}:
			case <-ctx.Done():
			case <-spm.ctx.Done():
			}
135 136 137 138
		}
	}(c)
}

139 140 141 142 143 144 145 146 147
// SetTimeoutDuration changes the length of time used to timeout recording of
// requests
func (spm *SessionPeerManager) SetTimeoutDuration(timeoutDuration time.Duration) {
	select {
	case spm.peerMessages <- &setTimeoutMessage{timeoutDuration}:
	case <-spm.ctx.Done():
	}
}

148 149 150
func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
151 152
		case pm := <-spm.peerMessages:
			pm.handle(spm)
153 154 155 156 157 158
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
159

160 161 162 163 164 165 166
func (spm *SessionPeerManager) tagPeer(p peer.ID, data *peerData) {
	var value int
	if data.hasLatency {
		value = optimizedTagValue
	} else {
		value = unoptimizedTagValue
	}
167
	spm.tagger.TagPeer(p, spm.tag, value)
hannahhoward's avatar
hannahhoward committed
168 169
}

170 171 172 173 174 175 176 177 178
func (spm *SessionPeerManager) insertPeer(p peer.ID, data *peerData) {
	if data.hasLatency {
		insertPos := sort.Search(len(spm.optimizedPeersArr), func(i int) bool {
			return spm.activePeers[spm.optimizedPeersArr[i]].latency > data.latency
		})
		spm.optimizedPeersArr = append(spm.optimizedPeersArr[:insertPos],
			append([]peer.ID{p}, spm.optimizedPeersArr[insertPos:]...)...)
	} else {
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
hannahhoward's avatar
hannahhoward committed
179 180 181
	}
}

182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
func (spm *SessionPeerManager) recordResponse(p peer.ID, k cid.Cid) {
	data, ok := spm.activePeers[p]
	wasOptimized := ok && data.hasLatency
	if wasOptimized {
		spm.removeOptimizedPeer(p)
	} else {
		if ok {
			spm.removeUnoptimizedPeer(p)
		} else {
			data = newPeerData()
			spm.activePeers[p] = data
		}
	}
	fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
	data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
	if !ok || wasOptimized != data.hasLatency {
		spm.tagPeer(p, data)
	}
	spm.insertPeer(p, data)
}

hannahhoward's avatar
hannahhoward committed
222 223 224 225 226 227
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
228
	if _, ok := spm.activePeers[p]; !ok {
229 230
		spm.activePeers[p] = newPeerData()
		spm.insertPeer(p, spm.activePeers[p])
231
		spm.tagPeer(p, spm.activePeers[p])
hannahhoward's avatar
hannahhoward committed
232 233 234 235 236
	}
}

type peerResponseMessage struct {
	p peer.ID
237
	k cid.Cid
hannahhoward's avatar
hannahhoward committed
238 239 240
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
241
	spm.recordResponse(prm.p, prm.k)
242 243 244 245 246 247 248 249 250
}

type peerRequestMessage struct {
	peers []peer.ID
	keys  []cid.Cid
}

func (spm *SessionPeerManager) makeTimeout(p peer.ID) afterTimeoutFunc {
	return func(k cid.Cid) {
251 252 253 254
		select {
		case spm.peerMessages <- &peerTimeoutMessage{p, k}:
		case <-spm.ctx.Done():
		}
255 256 257 258 259
	}
}

func (prm *peerRequestMessage) handle(spm *SessionPeerManager) {
	if prm.peers == nil {
260 261 262 263 264 265
		spm.broadcastLatency.SetupRequests(prm.keys, spm.timeoutDuration, func(k cid.Cid) {
			select {
			case spm.peerMessages <- &broadcastTimeoutMessage{k}:
			case <-spm.ctx.Done():
			}
		})
266 267 268
	} else {
		for _, p := range prm.peers {
			if data, ok := spm.activePeers[p]; ok {
269
				data.lt.SetupRequests(prm.keys, spm.timeoutDuration, spm.makeTimeout(p))
270 271 272
			}
		}
	}
hannahhoward's avatar
hannahhoward committed
273 274
}

275
type getPeersMessage struct {
276
	resp chan<- []bssd.OptimizedPeer
hannahhoward's avatar
hannahhoward committed
277 278
}

279
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
hannahhoward's avatar
hannahhoward committed
280 281 282 283 284
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
	var bestPeerLatency float64
	if len(spm.optimizedPeersArr) > 0 {
		bestPeerLatency = float64(spm.activePeers[spm.optimizedPeersArr[0]].latency)
	} else {
		bestPeerLatency = 0
	}
	optimizedPeers := make([]bssd.OptimizedPeer, 0, maxPeers)
	for i := 0; i < maxPeers; i++ {
		if i < len(spm.optimizedPeersArr) {
			p := spm.optimizedPeersArr[i]
			optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{
				Peer:               p,
				OptimizationRating: bestPeerLatency / float64(spm.activePeers[p].latency),
			})
		} else {
			p := spm.unoptimizedPeersArr[randomOrder[i-len(spm.optimizedPeersArr)]]
			optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: p, OptimizationRating: 0.0})
		}
303
	}
304
	prm.resp <- optimizedPeers
305 306
}

307 308 309 310 311 312 313 314 315 316
type cancelMessage struct {
	k cid.Cid
}

func (cm *cancelMessage) handle(spm *SessionPeerManager) {
	for _, data := range spm.activePeers {
		data.lt.RecordCancel(cm.k)
	}
}

317
func (spm *SessionPeerManager) handleShutdown() {
318
	for p, data := range spm.activePeers {
319
		spm.tagger.UntagPeer(p, spm.tag)
320
		data.lt.Shutdown()
321 322
	}
}
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350

type peerTimeoutMessage struct {
	p peer.ID
	k cid.Cid
}

func (ptm *peerTimeoutMessage) handle(spm *SessionPeerManager) {
	data, ok := spm.activePeers[ptm.p]
	if !ok || !data.lt.WasCancelled(ptm.k) {
		spm.recordResponse(ptm.p, ptm.k)
	}
}

type broadcastTimeoutMessage struct {
	k cid.Cid
}

func (btm *broadcastTimeoutMessage) handle(spm *SessionPeerManager) {
	spm.broadcastLatency.RemoveRequest(btm.k)
}

type setTimeoutMessage struct {
	timeoutDuration time.Duration
}

func (stm *setTimeoutMessage) handle(spm *SessionPeerManager) {
	spm.timeoutDuration = stm.timeoutDuration
}