sessionpeermanager.go 7.43 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7
	"sort"
8

9 10
	bssd "github.com/ipfs/go-bitswap/sessiondata"

11
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
12
	peer "github.com/libp2p/go-libp2p-core/peer"
13 14
)

hannahhoward's avatar
hannahhoward committed
15
const (
16 17 18
	maxOptimizedPeers   = 32
	unoptimizedTagValue = 5  // tag value for "unoptimized" session peers.
	optimizedTagValue   = 10 // tag value for "optimized" session peers.
hannahhoward's avatar
hannahhoward committed
19 20
)

21 22 23 24 25 26 27 28
// PeerTagger is an interface for tagging peers with metadata
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

// PeerProviderFinder is an interface for finding providers
type PeerProviderFinder interface {
29
	FindProvidersAsync(context.Context, cid.Cid) <-chan peer.ID
30 31
}

hannahhoward's avatar
hannahhoward committed
32 33 34 35
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

36 37
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
38
type SessionPeerManager struct {
39 40 41 42 43
	ctx            context.Context
	tagger         PeerTagger
	providerFinder PeerProviderFinder
	tag            string
	id             uint64
44

hannahhoward's avatar
hannahhoward committed
45
	peerMessages chan peerMessage
46 47

	// do not touch outside of run loop
48
	activePeers         map[peer.ID]*peerData
hannahhoward's avatar
hannahhoward committed
49 50
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
51
	broadcastLatency    *latencyTracker
52 53
}

54
// New creates a new SessionPeerManager
55
func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager {
56
	spm := &SessionPeerManager{
57
		ctx:              ctx,
58 59
		tagger:         tagger,
		providerFinder: providerFinder,
60 61 62
		peerMessages:     make(chan peerMessage, 16),
		activePeers:      make(map[peer.ID]*peerData),
		broadcastLatency: newLatencyTracker(),
63 64 65 66 67 68 69 70
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

71 72
// RecordPeerResponse records that a peer received a block, and adds to it
// the list of peers if it wasn't already added
73
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
74

75 76 77
	// at the moment, we're just adding peers here
	// in the future, we'll actually use this to record metrics
	select {
78
	case spm.peerMessages <- &peerResponseMessage{p, k}:
79 80 81 82
	case <-spm.ctx.Done():
	}
}

83
// RecordPeerRequests records that a given set of peers requested the given cids.
84 85 86
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
	// at the moment, we're not doing anything here
	// soon we'll use this to track latency by peer
87 88 89 90
	select {
	case spm.peerMessages <- &peerRequestMessage{p, ks}:
	case <-spm.ctx.Done():
	}
91 92
}

93 94 95
// GetOptimizedPeers returns the best peers available for a session, along with
// a rating for how good they are, in comparison to the best peer.
func (spm *SessionPeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
96 97
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
98
	resp := make(chan []bssd.OptimizedPeer, 1)
99
	select {
100
	case spm.peerMessages <- &getPeersMessage{resp}:
101 102 103 104 105 106 107 108 109 110 111 112
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

113 114
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
115 116
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
117 118
		for p := range spm.providerFinder.FindProvidersAsync(ctx, k) {

119 120 121 122 123
			select {
			case spm.peerMessages <- &peerFoundMessage{p}:
			case <-ctx.Done():
			case <-spm.ctx.Done():
			}
124 125 126 127 128 129 130
		}
	}(c)
}

func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
131 132
		case pm := <-spm.peerMessages:
			pm.handle(spm)
133 134 135 136 137 138
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
139

140
func (spm *SessionPeerManager) tagPeer(p peer.ID, value int) {
141
	spm.tagger.TagPeer(p, spm.tag, value)
hannahhoward's avatar
hannahhoward committed
142 143
}

144 145 146 147 148 149 150 151 152
func (spm *SessionPeerManager) insertPeer(p peer.ID, data *peerData) {
	if data.hasLatency {
		insertPos := sort.Search(len(spm.optimizedPeersArr), func(i int) bool {
			return spm.activePeers[spm.optimizedPeersArr[i]].latency > data.latency
		})
		spm.optimizedPeersArr = append(spm.optimizedPeersArr[:insertPos],
			append([]peer.ID{p}, spm.optimizedPeersArr[insertPos:]...)...)
	} else {
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
hannahhoward's avatar
hannahhoward committed
153 154 155
	}
}

156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

hannahhoward's avatar
hannahhoward committed
175 176 177 178 179 180
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
181
	if _, ok := spm.activePeers[p]; !ok {
182 183
		spm.activePeers[p] = newPeerData()
		spm.insertPeer(p, spm.activePeers[p])
184
		spm.tagPeer(p, unoptimizedTagValue)
hannahhoward's avatar
hannahhoward committed
185 186 187 188 189
	}
}

type peerResponseMessage struct {
	p peer.ID
190
	k cid.Cid
hannahhoward's avatar
hannahhoward committed
191 192 193 194
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
	p := prm.p
195 196
	k := prm.k
	data, ok := spm.activePeers[p]
197 198 199
	wasOptimized := ok && data.hasLatency
	if wasOptimized {
		spm.removeOptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
200
	} else {
201
		if ok {
202
			spm.removeUnoptimizedPeer(p)
203 204 205
		} else {
			data = newPeerData()
			spm.activePeers[p] = data
hannahhoward's avatar
hannahhoward committed
206 207
		}
	}
208 209
	fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
	data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
210 211 212 213 214 215 216 217 218
	var tagValue int
	if data.hasLatency {
		tagValue = optimizedTagValue
	} else {
		tagValue = unoptimizedTagValue
	}
	if !ok || wasOptimized != data.hasLatency {
		spm.tagPeer(p, tagValue)
	}
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
	spm.insertPeer(p, data)
}

type peerRequestMessage struct {
	peers []peer.ID
	keys  []cid.Cid
}

func (spm *SessionPeerManager) makeTimeout(p peer.ID) afterTimeoutFunc {
	return func(k cid.Cid) {
		spm.RecordPeerResponse(p, k)
	}
}

func (prm *peerRequestMessage) handle(spm *SessionPeerManager) {
	if prm.peers == nil {
		spm.broadcastLatency.SetupRequests(prm.keys, func(k cid.Cid) {})
	} else {
		for _, p := range prm.peers {
			if data, ok := spm.activePeers[p]; ok {
				data.lt.SetupRequests(prm.keys, spm.makeTimeout(p))
			}
		}
	}
hannahhoward's avatar
hannahhoward committed
243 244
}

245
type getPeersMessage struct {
246
	resp chan<- []bssd.OptimizedPeer
hannahhoward's avatar
hannahhoward committed
247 248
}

249
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
hannahhoward's avatar
hannahhoward committed
250 251 252 253 254
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
	var bestPeerLatency float64
	if len(spm.optimizedPeersArr) > 0 {
		bestPeerLatency = float64(spm.activePeers[spm.optimizedPeersArr[0]].latency)
	} else {
		bestPeerLatency = 0
	}
	optimizedPeers := make([]bssd.OptimizedPeer, 0, maxPeers)
	for i := 0; i < maxPeers; i++ {
		if i < len(spm.optimizedPeersArr) {
			p := spm.optimizedPeersArr[i]
			optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{
				Peer:               p,
				OptimizationRating: bestPeerLatency / float64(spm.activePeers[p].latency),
			})
		} else {
			p := spm.unoptimizedPeersArr[randomOrder[i-len(spm.optimizedPeersArr)]]
			optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: p, OptimizationRating: 0.0})
		}
273
	}
274
	prm.resp <- optimizedPeers
275 276 277
}

func (spm *SessionPeerManager) handleShutdown() {
278
	for p, data := range spm.activePeers {
279
		spm.tagger.UntagPeer(p, spm.tag)
280
		data.lt.Shutdown()
281 282
	}
}