sessionpeermanager.go 5.61 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7 8

	logging "github.com/ipfs/go-log"
9 10 11 12 13

	cid "github.com/ipfs/go-cid"
	peer "github.com/libp2p/go-libp2p-peer"
)

14 15
var log = logging.Logger("bitswap")

hannahhoward's avatar
hannahhoward committed
16
const (
17
	maxOptimizedPeers = 32
hannahhoward's avatar
hannahhoward committed
18 19 20
	reservePeers      = 2
)

21 22 23 24 25 26 27 28
// PeerTagger is an interface for tagging peers with metadata
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

// PeerProviderFinder is an interface for finding providers
type PeerProviderFinder interface {
29
	FindProvidersAsync(context.Context, cid.Cid) <-chan peer.ID
30 31
}

hannahhoward's avatar
hannahhoward committed
32 33 34 35
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

36 37
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
38
type SessionPeerManager struct {
39 40 41 42 43
	ctx            context.Context
	tagger         PeerTagger
	providerFinder PeerProviderFinder
	tag            string
	id             uint64
44

hannahhoward's avatar
hannahhoward committed
45
	peerMessages chan peerMessage
46 47

	// do not touch outside of run loop
hannahhoward's avatar
hannahhoward committed
48 49 50
	activePeers         map[peer.ID]bool
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
51 52
}

53
// New creates a new SessionPeerManager
54
func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager {
55
	spm := &SessionPeerManager{
56 57 58 59 60 61
		id:             id,
		ctx:            ctx,
		tagger:         tagger,
		providerFinder: providerFinder,
		peerMessages:   make(chan peerMessage, 16),
		activePeers:    make(map[peer.ID]bool),
62 63 64 65 66 67 68 69
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

70 71
// RecordPeerResponse records that a peer received a block, and adds to it
// the list of peers if it wasn't already added
72
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
73

74 75 76
	// at the moment, we're just adding peers here
	// in the future, we'll actually use this to record metrics
	select {
hannahhoward's avatar
hannahhoward committed
77
	case spm.peerMessages <- &peerResponseMessage{p}:
78 79 80 81
	case <-spm.ctx.Done():
	}
}

82
// RecordPeerRequests records that a given set of peers requested the given cids
83 84 85 86 87
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
	// at the moment, we're not doing anything here
	// soon we'll use this to track latency by peer
}

88
// GetOptimizedPeers returns the best peers available for a session
89 90 91
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
92
	resp := make(chan []peer.ID, 1)
93
	select {
hannahhoward's avatar
hannahhoward committed
94
	case spm.peerMessages <- &peerReqMessage{resp}:
95 96 97 98 99 100 101 102 103 104 105 106
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

107 108
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
109 110
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
111 112
		for p := range spm.providerFinder.FindProvidersAsync(ctx, k) {

113 114 115 116 117
			select {
			case spm.peerMessages <- &peerFoundMessage{p}:
			case <-ctx.Done():
			case <-spm.ctx.Done():
			}
118 119 120 121 122 123 124
		}
	}(c)
}

func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
125 126
		case pm := <-spm.peerMessages:
			pm.handle(spm)
127 128 129 130 131 132
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
133 134

func (spm *SessionPeerManager) tagPeer(p peer.ID) {
135
	spm.tagger.TagPeer(p, spm.tag, 10)
hannahhoward's avatar
hannahhoward committed
136 137 138 139 140 141 142 143 144 145 146 147
}

func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
	if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
		tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
		spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
	}

	spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}

148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

hannahhoward's avatar
hannahhoward committed
167 168 169 170 171 172
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
173
	if _, ok := spm.activePeers[p]; !ok {
hannahhoward's avatar
hannahhoward committed
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
		spm.activePeers[p] = false
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
		spm.tagPeer(p)
	}
}

type peerResponseMessage struct {
	p peer.ID
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {

	p := prm.p
	isOptimized, ok := spm.activePeers[p]
	if !ok {
		spm.activePeers[p] = true
		spm.tagPeer(p)
	} else {
		if isOptimized {
193
			spm.removeOptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
194 195
		} else {
			spm.activePeers[p] = true
196
			spm.removeUnoptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
		}
	}
	spm.insertOptimizedPeer(p)
}

type peerReqMessage struct {
	resp chan<- []peer.ID
}

func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
212

hannahhoward's avatar
hannahhoward committed
213 214 215
	extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
	for i := range extraPeers {
		extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
216
	}
hannahhoward's avatar
hannahhoward committed
217
	prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
218 219 220
}

func (spm *SessionPeerManager) handleShutdown() {
hannahhoward's avatar
hannahhoward committed
221
	for p := range spm.activePeers {
222
		spm.tagger.UntagPeer(p, spm.tag)
223 224
	}
}