sessionpeermanager.go 5.82 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7 8

	logging "github.com/ipfs/go-log"
9 10 11 12 13 14

	cid "github.com/ipfs/go-cid"
	ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
	peer "github.com/libp2p/go-libp2p-peer"
)

15 16
var log = logging.Logger("bitswap")

hannahhoward's avatar
hannahhoward committed
17
const (
18
	maxOptimizedPeers = 32
hannahhoward's avatar
hannahhoward committed
19 20 21
	reservePeers      = 2
)

22
// PeerNetwork is an interface for finding providers and managing connections
23 24
type PeerNetwork interface {
	ConnectionManager() ifconnmgr.ConnManager
25
	ConnectTo(context.Context, peer.ID) error
26 27 28
	FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}

hannahhoward's avatar
hannahhoward committed
29 30 31 32
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

33 34
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
35 36 37 38 39
type SessionPeerManager struct {
	ctx     context.Context
	network PeerNetwork
	tag     string

hannahhoward's avatar
hannahhoward committed
40
	peerMessages chan peerMessage
41 42

	// do not touch outside of run loop
hannahhoward's avatar
hannahhoward committed
43 44 45
	activePeers         map[peer.ID]bool
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
46 47
}

48
// New creates a new SessionPeerManager
49 50
func New(ctx context.Context, id uint64, network PeerNetwork) *SessionPeerManager {
	spm := &SessionPeerManager{
hannahhoward's avatar
hannahhoward committed
51 52 53 54
		ctx:          ctx,
		network:      network,
		peerMessages: make(chan peerMessage, 16),
		activePeers:  make(map[peer.ID]bool),
55 56 57 58 59 60 61 62
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

63 64
// RecordPeerResponse records that a peer received a block, and adds to it
// the list of peers if it wasn't already added
65
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
66

67 68 69
	// at the moment, we're just adding peers here
	// in the future, we'll actually use this to record metrics
	select {
hannahhoward's avatar
hannahhoward committed
70
	case spm.peerMessages <- &peerResponseMessage{p}:
71 72 73 74
	case <-spm.ctx.Done():
	}
}

75
// RecordPeerRequests records that a given set of peers requested the given cids
76 77 78 79 80
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
	// at the moment, we're not doing anything here
	// soon we'll use this to track latency by peer
}

81
// GetOptimizedPeers returns the best peers available for a session
82 83 84 85 86
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
	resp := make(chan []peer.ID)
	select {
hannahhoward's avatar
hannahhoward committed
87
	case spm.peerMessages <- &peerReqMessage{resp}:
88 89 90 91 92 93 94 95 96 97 98 99
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

100 101
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
102 103 104 105 106 107 108 109
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
		// TODO: have a task queue setup for this to:
		// - rate limit
		// - manage timeouts
		// - ensure two 'findprovs' calls for the same block don't run concurrently
		// - share peers between sessions based on interest set
		for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
110 111 112 113 114 115 116
			go func(p peer.ID) {
				err := spm.network.ConnectTo(ctx, p)
				if err != nil {
					log.Debugf("failed to connect to provider %s: %s", p, err)
				}
				spm.peerMessages <- &peerFoundMessage{p}
			}(p)
117 118 119 120 121 122 123
		}
	}(c)
}

func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
124 125
		case pm := <-spm.peerMessages:
			pm.handle(spm)
126 127 128 129 130 131
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147

func (spm *SessionPeerManager) tagPeer(p peer.ID) {
	cmgr := spm.network.ConnectionManager()
	cmgr.TagPeer(p, spm.tag, 10)
}

func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
	if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
		tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
		spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
	}

	spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}

148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

hannahhoward's avatar
hannahhoward committed
167 168 169 170 171 172
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
173
	if _, ok := spm.activePeers[p]; !ok {
hannahhoward's avatar
hannahhoward committed
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
		spm.activePeers[p] = false
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
		spm.tagPeer(p)
	}
}

type peerResponseMessage struct {
	p peer.ID
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {

	p := prm.p
	isOptimized, ok := spm.activePeers[p]
	if !ok {
		spm.activePeers[p] = true
		spm.tagPeer(p)
	} else {
		if isOptimized {
193
			spm.removeOptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
194 195
		} else {
			spm.activePeers[p] = true
196
			spm.removeUnoptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
		}
	}
	spm.insertOptimizedPeer(p)
}

type peerReqMessage struct {
	resp chan<- []peer.ID
}

func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
212

hannahhoward's avatar
hannahhoward committed
213 214 215
	extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
	for i := range extraPeers {
		extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
216
	}
hannahhoward's avatar
hannahhoward committed
217
	prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
218 219 220 221
}

func (spm *SessionPeerManager) handleShutdown() {
	cmgr := spm.network.ConnectionManager()
hannahhoward's avatar
hannahhoward committed
222
	for p := range spm.activePeers {
223 224 225
		cmgr.UntagPeer(p, spm.tag)
	}
}