sessionpeermanager.go 5.43 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7 8 9 10 11 12

	cid "github.com/ipfs/go-cid"
	ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
	peer "github.com/libp2p/go-libp2p-peer"
)

hannahhoward's avatar
hannahhoward committed
13 14 15 16 17
const (
	maxOptimizedPeers = 25
	reservePeers      = 2
)

18
// PeerNetwork is an interface for finding providers and managing connections
19 20 21 22 23
type PeerNetwork interface {
	ConnectionManager() ifconnmgr.ConnManager
	FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}

hannahhoward's avatar
hannahhoward committed
24 25 26 27
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

28 29
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
30 31 32 33 34
type SessionPeerManager struct {
	ctx     context.Context
	network PeerNetwork
	tag     string

hannahhoward's avatar
hannahhoward committed
35
	peerMessages chan peerMessage
36 37

	// do not touch outside of run loop
hannahhoward's avatar
hannahhoward committed
38 39 40
	activePeers         map[peer.ID]bool
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
41 42
}

43
// New creates a new SessionPeerManager
44 45
func New(ctx context.Context, id uint64, network PeerNetwork) *SessionPeerManager {
	spm := &SessionPeerManager{
hannahhoward's avatar
hannahhoward committed
46 47 48 49
		ctx:          ctx,
		network:      network,
		peerMessages: make(chan peerMessage, 16),
		activePeers:  make(map[peer.ID]bool),
50 51 52 53 54 55 56 57
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

58 59
// RecordPeerResponse records that a peer received a block, and adds to it
// the list of peers if it wasn't already added
60
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
61

62 63 64
	// at the moment, we're just adding peers here
	// in the future, we'll actually use this to record metrics
	select {
hannahhoward's avatar
hannahhoward committed
65
	case spm.peerMessages <- &peerResponseMessage{p}:
66 67 68 69
	case <-spm.ctx.Done():
	}
}

70
// RecordPeerRequests records that a given set of peers requested the given cids
71 72 73 74 75
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
	// at the moment, we're not doing anything here
	// soon we'll use this to track latency by peer
}

76
// GetOptimizedPeers returns the best peers available for a session
77 78 79 80 81
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
	resp := make(chan []peer.ID)
	select {
hannahhoward's avatar
hannahhoward committed
82
	case spm.peerMessages <- &peerReqMessage{resp}:
83 84 85 86 87 88 89 90 91 92 93 94
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

95 96
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
97 98 99 100 101 102 103 104
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
		// TODO: have a task queue setup for this to:
		// - rate limit
		// - manage timeouts
		// - ensure two 'findprovs' calls for the same block don't run concurrently
		// - share peers between sessions based on interest set
		for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
hannahhoward's avatar
hannahhoward committed
105
			spm.peerMessages <- &peerFoundMessage{p}
106 107 108 109 110 111 112
		}
	}(c)
}

func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
113 114
		case pm := <-spm.peerMessages:
			pm.handle(spm)
115 116 117 118 119 120
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142

func (spm *SessionPeerManager) tagPeer(p peer.ID) {
	cmgr := spm.network.ConnectionManager()
	cmgr.TagPeer(p, spm.tag, 10)
}

func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
	if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
		tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
		spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
	}

	spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}

type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
143
	if _, ok := spm.activePeers[p]; !ok {
hannahhoward's avatar
hannahhoward committed
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
		spm.activePeers[p] = false
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
		spm.tagPeer(p)
	}
}

type peerResponseMessage struct {
	p peer.ID
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {

	p := prm.p
	isOptimized, ok := spm.activePeers[p]
	if !ok {
		spm.activePeers[p] = true
		spm.tagPeer(p)
	} else {
		if isOptimized {
			if spm.optimizedPeersArr[0] == p {
				return
			}
			for i := 0; i < len(spm.optimizedPeersArr); i++ {
				if spm.optimizedPeersArr[i] == p {
					spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
					break
				}
			}
		} else {
			spm.activePeers[p] = true
			for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
				if spm.unoptimizedPeersArr[i] == p {
					spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
					spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
					break
				}
			}
		}
	}
	spm.insertOptimizedPeer(p)
}

type peerReqMessage struct {
	resp chan<- []peer.ID
}

func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
196

hannahhoward's avatar
hannahhoward committed
197 198 199
	extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
	for i := range extraPeers {
		extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
200
	}
hannahhoward's avatar
hannahhoward committed
201
	prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
202 203 204 205
}

func (spm *SessionPeerManager) handleShutdown() {
	cmgr := spm.network.ConnectionManager()
hannahhoward's avatar
hannahhoward committed
206
	for p := range spm.activePeers {
207 208 209
		cmgr.UntagPeer(p, spm.tag)
	}
}