sessionpeermanager.go 5.54 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7 8 9 10 11 12

	cid "github.com/ipfs/go-cid"
	ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
	peer "github.com/libp2p/go-libp2p-peer"
)

hannahhoward's avatar
hannahhoward committed
13
const (
14
	maxOptimizedPeers = 32
hannahhoward's avatar
hannahhoward committed
15 16 17
	reservePeers      = 2
)

18
// PeerNetwork is an interface for finding providers and managing connections
19 20 21 22 23
type PeerNetwork interface {
	ConnectionManager() ifconnmgr.ConnManager
	FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}

hannahhoward's avatar
hannahhoward committed
24 25 26 27
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

28 29
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
30 31 32 33 34
type SessionPeerManager struct {
	ctx     context.Context
	network PeerNetwork
	tag     string

hannahhoward's avatar
hannahhoward committed
35
	peerMessages chan peerMessage
36 37

	// do not touch outside of run loop
hannahhoward's avatar
hannahhoward committed
38 39 40
	activePeers         map[peer.ID]bool
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
41 42
}

43
// New creates a new SessionPeerManager
44 45
func New(ctx context.Context, id uint64, network PeerNetwork) *SessionPeerManager {
	spm := &SessionPeerManager{
hannahhoward's avatar
hannahhoward committed
46 47 48 49
		ctx:          ctx,
		network:      network,
		peerMessages: make(chan peerMessage, 16),
		activePeers:  make(map[peer.ID]bool),
50 51 52 53 54 55 56 57
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

58 59
// RecordPeerResponse records that a peer received a block, and adds to it
// the list of peers if it wasn't already added
60
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
61

62 63 64
	// at the moment, we're just adding peers here
	// in the future, we'll actually use this to record metrics
	select {
hannahhoward's avatar
hannahhoward committed
65
	case spm.peerMessages <- &peerResponseMessage{p}:
66 67 68 69
	case <-spm.ctx.Done():
	}
}

70
// RecordPeerRequests records that a given set of peers requested the given cids
71 72 73 74 75
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
	// at the moment, we're not doing anything here
	// soon we'll use this to track latency by peer
}

76
// GetOptimizedPeers returns the best peers available for a session
77 78 79 80 81
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
	resp := make(chan []peer.ID)
	select {
hannahhoward's avatar
hannahhoward committed
82
	case spm.peerMessages <- &peerReqMessage{resp}:
83 84 85 86 87 88 89 90 91 92 93 94
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

95 96
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
97 98 99 100 101 102 103 104
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
		// TODO: have a task queue setup for this to:
		// - rate limit
		// - manage timeouts
		// - ensure two 'findprovs' calls for the same block don't run concurrently
		// - share peers between sessions based on interest set
		for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
hannahhoward's avatar
hannahhoward committed
105
			spm.peerMessages <- &peerFoundMessage{p}
106 107 108 109 110 111 112
		}
	}(c)
}

func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
113 114
		case pm := <-spm.peerMessages:
			pm.handle(spm)
115 116 117 118 119 120
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136

func (spm *SessionPeerManager) tagPeer(p peer.ID) {
	cmgr := spm.network.ConnectionManager()
	cmgr.TagPeer(p, spm.tag, 10)
}

func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
	if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
		tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
		spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
	}

	spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}

137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

hannahhoward's avatar
hannahhoward committed
156 157 158 159 160 161
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
162
	if _, ok := spm.activePeers[p]; !ok {
hannahhoward's avatar
hannahhoward committed
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
		spm.activePeers[p] = false
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
		spm.tagPeer(p)
	}
}

type peerResponseMessage struct {
	p peer.ID
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {

	p := prm.p
	isOptimized, ok := spm.activePeers[p]
	if !ok {
		spm.activePeers[p] = true
		spm.tagPeer(p)
	} else {
		if isOptimized {
182
			spm.removeOptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
183 184
		} else {
			spm.activePeers[p] = true
185
			spm.removeUnoptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
		}
	}
	spm.insertOptimizedPeer(p)
}

type peerReqMessage struct {
	resp chan<- []peer.ID
}

func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
201

hannahhoward's avatar
hannahhoward committed
202 203 204
	extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
	for i := range extraPeers {
		extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
205
	}
hannahhoward's avatar
hannahhoward committed
206
	prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
207 208 209 210
}

func (spm *SessionPeerManager) handleShutdown() {
	cmgr := spm.network.ConnectionManager()
hannahhoward's avatar
hannahhoward committed
211
	for p := range spm.activePeers {
212 213 214
		cmgr.UntagPeer(p, spm.tag)
	}
}