sessionpeermanager.go 5.73 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7

8
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
9
	peer "github.com/libp2p/go-libp2p-core/peer"
10 11
)

hannahhoward's avatar
hannahhoward committed
12
const (
13 14 15 16
	maxOptimizedPeers   = 32
	reservePeers        = 2
	unoptimizedTagValue = 5  // tag value for "unoptimized" session peers.
	optimizedTagValue   = 10 // tag value for "optimized" session peers.
hannahhoward's avatar
hannahhoward committed
17 18
)

19 20 21 22 23 24 25 26
// PeerTagger is an interface for tagging peers with metadata
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

// PeerProviderFinder is an interface for finding providers
type PeerProviderFinder interface {
27
	FindProvidersAsync(context.Context, cid.Cid) <-chan peer.ID
28 29
}

hannahhoward's avatar
hannahhoward committed
30 31 32 33
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

34 35
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
36
type SessionPeerManager struct {
37 38 39 40 41
	ctx            context.Context
	tagger         PeerTagger
	providerFinder PeerProviderFinder
	tag            string
	id             uint64
42

hannahhoward's avatar
hannahhoward committed
43
	peerMessages chan peerMessage
44 45

	// do not touch outside of run loop
hannahhoward's avatar
hannahhoward committed
46 47 48
	activePeers         map[peer.ID]bool
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
49 50
}

51
// New creates a new SessionPeerManager
52
func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager {
53
	spm := &SessionPeerManager{
54 55 56 57 58 59
		id:             id,
		ctx:            ctx,
		tagger:         tagger,
		providerFinder: providerFinder,
		peerMessages:   make(chan peerMessage, 16),
		activePeers:    make(map[peer.ID]bool),
60 61 62 63 64 65 66 67
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

68 69
// RecordPeerResponse records that a peer received a block, and adds to it
// the list of peers if it wasn't already added
70
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
71

72 73 74
	// at the moment, we're just adding peers here
	// in the future, we'll actually use this to record metrics
	select {
hannahhoward's avatar
hannahhoward committed
75
	case spm.peerMessages <- &peerResponseMessage{p}:
76 77 78 79
	case <-spm.ctx.Done():
	}
}

80
// RecordPeerRequests records that a given set of peers requested the given cids
81 82 83 84 85
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
	// at the moment, we're not doing anything here
	// soon we'll use this to track latency by peer
}

86
// GetOptimizedPeers returns the best peers available for a session
87 88 89
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
90
	resp := make(chan []peer.ID, 1)
91
	select {
hannahhoward's avatar
hannahhoward committed
92
	case spm.peerMessages <- &peerReqMessage{resp}:
93 94 95 96 97 98 99 100 101 102 103 104
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

105 106
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
107 108
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
109 110
		for p := range spm.providerFinder.FindProvidersAsync(ctx, k) {

111 112 113 114 115
			select {
			case spm.peerMessages <- &peerFoundMessage{p}:
			case <-ctx.Done():
			case <-spm.ctx.Done():
			}
116 117 118 119 120 121 122
		}
	}(c)
}

func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
123 124
		case pm := <-spm.peerMessages:
			pm.handle(spm)
125 126 127 128 129 130
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
131

132
func (spm *SessionPeerManager) tagPeer(p peer.ID, value int) {
133
	spm.tagger.TagPeer(p, spm.tag, value)
hannahhoward's avatar
hannahhoward committed
134 135 136 137 138 139 140 141 142 143 144 145
}

func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
	if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
		tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
		spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
	}

	spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}

146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

hannahhoward's avatar
hannahhoward committed
165 166 167 168 169 170
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
171
	if _, ok := spm.activePeers[p]; !ok {
hannahhoward's avatar
hannahhoward committed
172 173
		spm.activePeers[p] = false
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
174
		spm.tagPeer(p, unoptimizedTagValue)
hannahhoward's avatar
hannahhoward committed
175 176 177 178 179 180 181 182 183 184
	}
}

type peerResponseMessage struct {
	p peer.ID
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
	p := prm.p
	isOptimized, ok := spm.activePeers[p]
185 186
	if isOptimized {
		spm.removeOptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
187
	} else {
188 189 190 191 192
		spm.activePeers[p] = true
		spm.tagPeer(p, optimizedTagValue)

		// transition from unoptimized.
		if ok {
193
			spm.removeUnoptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
		}
	}
	spm.insertOptimizedPeer(p)
}

type peerReqMessage struct {
	resp chan<- []peer.ID
}

func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
209

hannahhoward's avatar
hannahhoward committed
210 211 212
	extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
	for i := range extraPeers {
		extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
213
	}
hannahhoward's avatar
hannahhoward committed
214
	prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
215 216 217
}

func (spm *SessionPeerManager) handleShutdown() {
hannahhoward's avatar
hannahhoward committed
218
	for p := range spm.activePeers {
219
		spm.tagger.UntagPeer(p, spm.tag)
220 221
	}
}