sessionpeermanager.go 6.58 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7
	"sort"
8

9
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
10
	peer "github.com/libp2p/go-libp2p-core/peer"
11 12
)

hannahhoward's avatar
hannahhoward committed
13
const (
14 15 16
	maxOptimizedPeers   = 32
	unoptimizedTagValue = 5  // tag value for "unoptimized" session peers.
	optimizedTagValue   = 10 // tag value for "optimized" session peers.
hannahhoward's avatar
hannahhoward committed
17 18
)

19 20 21 22 23 24 25 26
// PeerTagger is an interface for tagging peers with metadata
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

// PeerProviderFinder is an interface for finding providers
type PeerProviderFinder interface {
27
	FindProvidersAsync(context.Context, cid.Cid) <-chan peer.ID
28 29
}

hannahhoward's avatar
hannahhoward committed
30 31 32 33
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

34 35
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
36
type SessionPeerManager struct {
37 38 39 40 41
	ctx            context.Context
	tagger         PeerTagger
	providerFinder PeerProviderFinder
	tag            string
	id             uint64
42

hannahhoward's avatar
hannahhoward committed
43
	peerMessages chan peerMessage
44 45

	// do not touch outside of run loop
46
	activePeers         map[peer.ID]*peerData
hannahhoward's avatar
hannahhoward committed
47 48
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
49
	broadcastLatency    *latencyTracker
50 51
}

52
// New creates a new SessionPeerManager
53
func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager {
54
	spm := &SessionPeerManager{
55
		ctx:              ctx,
56 57
		tagger:         tagger,
		providerFinder: providerFinder,
58 59 60
		peerMessages:     make(chan peerMessage, 16),
		activePeers:      make(map[peer.ID]*peerData),
		broadcastLatency: newLatencyTracker(),
61 62 63 64 65 66 67 68
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

69 70
// RecordPeerResponse records that a peer received a block, and adds to it
// the list of peers if it wasn't already added
71
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
72

73 74 75
	// at the moment, we're just adding peers here
	// in the future, we'll actually use this to record metrics
	select {
76
	case spm.peerMessages <- &peerResponseMessage{p, k}:
77 78 79 80
	case <-spm.ctx.Done():
	}
}

81
// RecordPeerRequests records that a given set of peers requested the given cids
82 83 84
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
	// at the moment, we're not doing anything here
	// soon we'll use this to track latency by peer
85 86 87 88
	select {
	case spm.peerMessages <- &peerRequestMessage{p, ks}:
	case <-spm.ctx.Done():
	}
89 90
}

91
// GetOptimizedPeers returns the best peers available for a session
92 93 94
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
95
	resp := make(chan []peer.ID, 1)
96
	select {
97
	case spm.peerMessages <- &getPeersMessage{resp}:
98 99 100 101 102 103 104 105 106 107 108 109
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

110 111
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
112 113
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
114 115
		for p := range spm.providerFinder.FindProvidersAsync(ctx, k) {

116 117 118 119 120
			select {
			case spm.peerMessages <- &peerFoundMessage{p}:
			case <-ctx.Done():
			case <-spm.ctx.Done():
			}
121 122 123 124 125 126 127
		}
	}(c)
}

func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
128 129
		case pm := <-spm.peerMessages:
			pm.handle(spm)
130 131 132 133 134 135
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
136

137
func (spm *SessionPeerManager) tagPeer(p peer.ID, value int) {
138
	spm.tagger.TagPeer(p, spm.tag, value)
hannahhoward's avatar
hannahhoward committed
139 140
}

141 142 143 144 145 146 147 148 149
func (spm *SessionPeerManager) insertPeer(p peer.ID, data *peerData) {
	if data.hasLatency {
		insertPos := sort.Search(len(spm.optimizedPeersArr), func(i int) bool {
			return spm.activePeers[spm.optimizedPeersArr[i]].latency > data.latency
		})
		spm.optimizedPeersArr = append(spm.optimizedPeersArr[:insertPos],
			append([]peer.ID{p}, spm.optimizedPeersArr[insertPos:]...)...)
	} else {
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
hannahhoward's avatar
hannahhoward committed
150 151 152
	}
}

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

hannahhoward's avatar
hannahhoward committed
172 173 174 175 176 177
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
178
	if _, ok := spm.activePeers[p]; !ok {
179 180
		spm.activePeers[p] = newPeerData()
		spm.insertPeer(p, spm.activePeers[p])
181
		spm.tagPeer(p, unoptimizedTagValue)
hannahhoward's avatar
hannahhoward committed
182 183 184 185 186
	}
}

type peerResponseMessage struct {
	p peer.ID
187
	k cid.Cid
hannahhoward's avatar
hannahhoward committed
188 189 190 191
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
	p := prm.p
192 193 194 195 196 197
	k := prm.k
	data, ok := spm.activePeers[p]
	if !ok {
		data = newPeerData()
		spm.activePeers[p] = data
		spm.tagPeer(p)
hannahhoward's avatar
hannahhoward committed
198
	} else {
199 200 201
                if data.hasLatency {
			spm.removeOptimizedPeer(p)
		} else {
202
			spm.removeUnoptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
203 204
		}
	}
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
	fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
	data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
	spm.insertPeer(p, data)
}

type peerRequestMessage struct {
	peers []peer.ID
	keys  []cid.Cid
}

func (spm *SessionPeerManager) makeTimeout(p peer.ID) afterTimeoutFunc {
	return func(k cid.Cid) {
		spm.RecordPeerResponse(p, k)
	}
}

func (prm *peerRequestMessage) handle(spm *SessionPeerManager) {
	if prm.peers == nil {
		spm.broadcastLatency.SetupRequests(prm.keys, func(k cid.Cid) {})
	} else {
		for _, p := range prm.peers {
			if data, ok := spm.activePeers[p]; ok {
				data.lt.SetupRequests(prm.keys, spm.makeTimeout(p))
			}
		}
	}
hannahhoward's avatar
hannahhoward committed
231 232
}

233
type getPeersMessage struct {
hannahhoward's avatar
hannahhoward committed
234 235 236
	resp chan<- []peer.ID
}

237
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
hannahhoward's avatar
hannahhoward committed
238 239 240 241 242
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
243

hannahhoward's avatar
hannahhoward committed
244 245 246
	extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
	for i := range extraPeers {
		extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
247
	}
hannahhoward's avatar
hannahhoward committed
248
	prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
249 250 251
}

func (spm *SessionPeerManager) handleShutdown() {
252
	for p, data := range spm.activePeers {
253
		spm.tagger.UntagPeer(p, spm.tag)
254
		data.lt.Shutdown()
255 256
	}
}