sessionpeermanager.go 9.25 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7
	"sort"
8
	"time"
9

10 11
	bssd "github.com/ipfs/go-bitswap/sessiondata"

12
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
13
	peer "github.com/libp2p/go-libp2p-core/peer"
14 15
)

hannahhoward's avatar
hannahhoward committed
16
const (
17 18 19 20
	defaultTimeoutDuration = 5 * time.Second
	maxOptimizedPeers      = 32
	unoptimizedTagValue    = 5  // tag value for "unoptimized" session peers.
	optimizedTagValue      = 10 // tag value for "optimized" session peers.
hannahhoward's avatar
hannahhoward committed
21 22
)

23 24 25 26 27 28 29 30
// PeerTagger is an interface for tagging peers with metadata
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

// PeerProviderFinder is an interface for finding providers
type PeerProviderFinder interface {
31
	FindProvidersAsync(context.Context, cid.Cid) <-chan peer.ID
32 33
}

hannahhoward's avatar
hannahhoward committed
34 35 36 37
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

38 39
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
40
type SessionPeerManager struct {
41 42 43 44 45
	ctx            context.Context
	tagger         PeerTagger
	providerFinder PeerProviderFinder
	tag            string
	id             uint64
46

hannahhoward's avatar
hannahhoward committed
47
	peerMessages chan peerMessage
48 49

	// do not touch outside of run loop
50
	activePeers         map[peer.ID]*peerData
hannahhoward's avatar
hannahhoward committed
51 52
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
53
	broadcastLatency    *latencyTracker
54
	timeoutDuration     time.Duration
55 56
}

57
// New creates a new SessionPeerManager
58
func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager {
59
	spm := &SessionPeerManager{
60
		ctx:              ctx,
61 62
		tagger:           tagger,
		providerFinder:   providerFinder,
63 64 65
		peerMessages:     make(chan peerMessage, 16),
		activePeers:      make(map[peer.ID]*peerData),
		broadcastLatency: newLatencyTracker(),
66
		timeoutDuration:  defaultTimeoutDuration,
67 68 69 70 71 72 73 74
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

75 76 77
// RecordPeerResponse records that a peer received some blocks, and adds the
// peer to the list of peers if it wasn't already added
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, ks []cid.Cid) {
78

79
	select {
80
	case spm.peerMessages <- &peerResponseMessage{p, ks}:
81 82 83 84
	case <-spm.ctx.Done():
	}
}

85 86 87
// RecordCancels records the fact that cancellations were sent to peers,
// so if blocks don't arrive, don't let it affect the peer's timeout
func (spm *SessionPeerManager) RecordCancels(ks []cid.Cid) {
88
	select {
89
	case spm.peerMessages <- &cancelMessage{ks}:
90 91 92 93
	case <-spm.ctx.Done():
	}
}

94
// RecordPeerRequests records that a given set of peers requested the given cids.
95
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
96 97 98 99
	select {
	case spm.peerMessages <- &peerRequestMessage{p, ks}:
	case <-spm.ctx.Done():
	}
100 101
}

102 103 104
// GetOptimizedPeers returns the best peers available for a session, along with
// a rating for how good they are, in comparison to the best peer.
func (spm *SessionPeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
105 106
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
107
	resp := make(chan []bssd.OptimizedPeer, 1)
108
	select {
109
	case spm.peerMessages <- &getPeersMessage{resp}:
110 111 112 113 114 115 116 117 118 119 120 121
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

122 123
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
124 125
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
126 127
		for p := range spm.providerFinder.FindProvidersAsync(ctx, k) {

128 129 130 131 132
			select {
			case spm.peerMessages <- &peerFoundMessage{p}:
			case <-ctx.Done():
			case <-spm.ctx.Done():
			}
133 134 135 136
		}
	}(c)
}

137 138 139 140 141 142 143 144 145
// SetTimeoutDuration changes the length of time used to timeout recording of
// requests
func (spm *SessionPeerManager) SetTimeoutDuration(timeoutDuration time.Duration) {
	select {
	case spm.peerMessages <- &setTimeoutMessage{timeoutDuration}:
	case <-spm.ctx.Done():
	}
}

146 147 148
func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
149 150
		case pm := <-spm.peerMessages:
			pm.handle(spm)
151 152 153 154 155 156
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
157

158 159 160 161 162 163 164
func (spm *SessionPeerManager) tagPeer(p peer.ID, data *peerData) {
	var value int
	if data.hasLatency {
		value = optimizedTagValue
	} else {
		value = unoptimizedTagValue
	}
165
	spm.tagger.TagPeer(p, spm.tag, value)
hannahhoward's avatar
hannahhoward committed
166 167
}

168 169 170 171 172 173 174 175 176
func (spm *SessionPeerManager) insertPeer(p peer.ID, data *peerData) {
	if data.hasLatency {
		insertPos := sort.Search(len(spm.optimizedPeersArr), func(i int) bool {
			return spm.activePeers[spm.optimizedPeersArr[i]].latency > data.latency
		})
		spm.optimizedPeersArr = append(spm.optimizedPeersArr[:insertPos],
			append([]peer.ID{p}, spm.optimizedPeersArr[insertPos:]...)...)
	} else {
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
hannahhoward's avatar
hannahhoward committed
177 178 179
	}
}

180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

199
func (spm *SessionPeerManager) recordResponse(p peer.ID, ks []cid.Cid) {
200 201 202 203 204 205 206 207 208 209 210 211
	data, ok := spm.activePeers[p]
	wasOptimized := ok && data.hasLatency
	if wasOptimized {
		spm.removeOptimizedPeer(p)
	} else {
		if ok {
			spm.removeUnoptimizedPeer(p)
		} else {
			data = newPeerData()
			spm.activePeers[p] = data
		}
	}
212 213 214 215
	for _, k := range ks {
		fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
		data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
	}
216 217 218 219 220 221
	if !ok || wasOptimized != data.hasLatency {
		spm.tagPeer(p, data)
	}
	spm.insertPeer(p, data)
}

hannahhoward's avatar
hannahhoward committed
222 223 224 225 226 227
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
228
	if _, ok := spm.activePeers[p]; !ok {
229 230
		spm.activePeers[p] = newPeerData()
		spm.insertPeer(p, spm.activePeers[p])
231
		spm.tagPeer(p, spm.activePeers[p])
hannahhoward's avatar
hannahhoward committed
232 233 234 235
	}
}

type peerResponseMessage struct {
236 237
	p  peer.ID
	ks []cid.Cid
hannahhoward's avatar
hannahhoward committed
238 239 240
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
241
	spm.recordResponse(prm.p, prm.ks)
242 243 244 245 246 247 248 249 250
}

type peerRequestMessage struct {
	peers []peer.ID
	keys  []cid.Cid
}

func (spm *SessionPeerManager) makeTimeout(p peer.ID) afterTimeoutFunc {
	return func(k cid.Cid) {
251 252 253 254
		select {
		case spm.peerMessages <- &peerTimeoutMessage{p, k}:
		case <-spm.ctx.Done():
		}
255 256 257 258 259
	}
}

func (prm *peerRequestMessage) handle(spm *SessionPeerManager) {
	if prm.peers == nil {
260 261 262 263 264 265
		spm.broadcastLatency.SetupRequests(prm.keys, spm.timeoutDuration, func(k cid.Cid) {
			select {
			case spm.peerMessages <- &broadcastTimeoutMessage{k}:
			case <-spm.ctx.Done():
			}
		})
266 267 268
	} else {
		for _, p := range prm.peers {
			if data, ok := spm.activePeers[p]; ok {
269
				data.lt.SetupRequests(prm.keys, spm.timeoutDuration, spm.makeTimeout(p))
270 271 272
			}
		}
	}
hannahhoward's avatar
hannahhoward committed
273 274
}

275
type getPeersMessage struct {
276
	resp chan<- []bssd.OptimizedPeer
hannahhoward's avatar
hannahhoward committed
277 278
}

279
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
hannahhoward's avatar
hannahhoward committed
280 281 282 283 284
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
	var bestPeerLatency float64
	if len(spm.optimizedPeersArr) > 0 {
		bestPeerLatency = float64(spm.activePeers[spm.optimizedPeersArr[0]].latency)
	} else {
		bestPeerLatency = 0
	}
	optimizedPeers := make([]bssd.OptimizedPeer, 0, maxPeers)
	for i := 0; i < maxPeers; i++ {
		if i < len(spm.optimizedPeersArr) {
			p := spm.optimizedPeersArr[i]
			optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{
				Peer:               p,
				OptimizationRating: bestPeerLatency / float64(spm.activePeers[p].latency),
			})
		} else {
			p := spm.unoptimizedPeersArr[randomOrder[i-len(spm.optimizedPeersArr)]]
			optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: p, OptimizationRating: 0.0})
		}
303
	}
304
	prm.resp <- optimizedPeers
305 306
}

307
type cancelMessage struct {
308
	ks []cid.Cid
309 310 311 312
}

func (cm *cancelMessage) handle(spm *SessionPeerManager) {
	for _, data := range spm.activePeers {
313
		data.lt.RecordCancel(cm.ks)
314 315 316
	}
}

317
func (spm *SessionPeerManager) handleShutdown() {
318
	for p, data := range spm.activePeers {
319
		spm.tagger.UntagPeer(p, spm.tag)
320
		data.lt.Shutdown()
321 322
	}
}
323 324 325 326 327 328 329 330

type peerTimeoutMessage struct {
	p peer.ID
	k cid.Cid
}

func (ptm *peerTimeoutMessage) handle(spm *SessionPeerManager) {
	data, ok := spm.activePeers[ptm.p]
331 332 333 334 335 336
	// If the request was cancelled, make sure we clean up the request tracker
	if ok && data.lt.WasCancelled(ptm.k) {
		data.lt.RemoveRequest(ptm.k)
	} else {
		// If the request was not cancelled, record the latency. Note that we
		// do this even if we didn't previously know about this peer.
337
		spm.recordResponse(ptm.p, []cid.Cid{ptm.k})
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
	}
}

type broadcastTimeoutMessage struct {
	k cid.Cid
}

func (btm *broadcastTimeoutMessage) handle(spm *SessionPeerManager) {
	spm.broadcastLatency.RemoveRequest(btm.k)
}

type setTimeoutMessage struct {
	timeoutDuration time.Duration
}

func (stm *setTimeoutMessage) handle(spm *SessionPeerManager) {
	spm.timeoutDuration = stm.timeoutDuration
}