sessionpeermanager.go 5.89 KB
Newer Older
1 2 3 4 5
package sessionpeermanager

import (
	"context"
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7 8 9
	"sync"

	logging "github.com/ipfs/go-log"
10 11 12 13 14 15

	cid "github.com/ipfs/go-cid"
	ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
	peer "github.com/libp2p/go-libp2p-peer"
)

16 17
var log = logging.Logger("bitswap")

hannahhoward's avatar
hannahhoward committed
18
const (
19
	maxOptimizedPeers = 32
hannahhoward's avatar
hannahhoward committed
20 21 22
	reservePeers      = 2
)

23
// PeerNetwork is an interface for finding providers and managing connections
24 25
type PeerNetwork interface {
	ConnectionManager() ifconnmgr.ConnManager
26
	ConnectTo(context.Context, peer.ID) error
27 28 29
	FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}

hannahhoward's avatar
hannahhoward committed
30 31 32 33
type peerMessage interface {
	handle(spm *SessionPeerManager)
}

34 35
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
36 37 38 39 40
type SessionPeerManager struct {
	ctx     context.Context
	network PeerNetwork
	tag     string

hannahhoward's avatar
hannahhoward committed
41
	peerMessages chan peerMessage
42 43

	// do not touch outside of run loop
hannahhoward's avatar
hannahhoward committed
44 45 46
	activePeers         map[peer.ID]bool
	unoptimizedPeersArr []peer.ID
	optimizedPeersArr   []peer.ID
47 48
}

49
// New creates a new SessionPeerManager
50 51
func New(ctx context.Context, id uint64, network PeerNetwork) *SessionPeerManager {
	spm := &SessionPeerManager{
hannahhoward's avatar
hannahhoward committed
52 53 54 55
		ctx:          ctx,
		network:      network,
		peerMessages: make(chan peerMessage, 16),
		activePeers:  make(map[peer.ID]bool),
56 57 58 59 60 61 62 63
	}

	spm.tag = fmt.Sprint("bs-ses-", id)

	go spm.run(ctx)
	return spm
}

64 65
// RecordPeerResponse records that a peer received a block, and adds to it
// the list of peers if it wasn't already added
66
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
67

68 69 70
	// at the moment, we're just adding peers here
	// in the future, we'll actually use this to record metrics
	select {
hannahhoward's avatar
hannahhoward committed
71
	case spm.peerMessages <- &peerResponseMessage{p}:
72 73 74 75
	case <-spm.ctx.Done():
	}
}

76
// RecordPeerRequests records that a given set of peers requested the given cids
77 78 79 80 81
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
	// at the moment, we're not doing anything here
	// soon we'll use this to track latency by peer
}

82
// GetOptimizedPeers returns the best peers available for a session
83 84 85 86 87
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
	// right now this just returns all peers, but soon we might return peers
	// ordered by optimization, or only a subset
	resp := make(chan []peer.ID)
	select {
hannahhoward's avatar
hannahhoward committed
88
	case spm.peerMessages <- &peerReqMessage{resp}:
89 90 91 92 93 94 95 96 97 98 99 100
	case <-spm.ctx.Done():
		return nil
	}

	select {
	case peers := <-resp:
		return peers
	case <-spm.ctx.Done():
		return nil
	}
}

101 102
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
103 104 105 106 107 108 109
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
	go func(k cid.Cid) {
		// TODO: have a task queue setup for this to:
		// - rate limit
		// - manage timeouts
		// - ensure two 'findprovs' calls for the same block don't run concurrently
		// - share peers between sessions based on interest set
110
		wg := &sync.WaitGroup{}
111
		for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
112 113 114 115 116 117 118 119 120
			wg.Add(1)
			go func(p peer.ID) {
				defer wg.Done()
				err := spm.network.ConnectTo(ctx, p)
				if err != nil {
					log.Debugf("failed to connect to provider %s: %s", p, err)
				}
				spm.peerMessages <- &peerFoundMessage{p}
			}(p)
121
		}
122
		wg.Wait()
123 124 125 126 127 128
	}(c)
}

func (spm *SessionPeerManager) run(ctx context.Context) {
	for {
		select {
hannahhoward's avatar
hannahhoward committed
129 130
		case pm := <-spm.peerMessages:
			pm.handle(spm)
131 132 133 134 135 136
		case <-ctx.Done():
			spm.handleShutdown()
			return
		}
	}
}
hannahhoward's avatar
hannahhoward committed
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152

func (spm *SessionPeerManager) tagPeer(p peer.ID) {
	cmgr := spm.network.ConnectionManager()
	cmgr.TagPeer(p, spm.tag, 10)
}

func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
	if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
		tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
		spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
	}

	spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.optimizedPeersArr); i++ {
		if spm.optimizedPeersArr[i] == p {
			spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
			return
		}
	}
}

func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
	for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
		if spm.unoptimizedPeersArr[i] == p {
			spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
			spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
			return
		}
	}
}

hannahhoward's avatar
hannahhoward committed
172 173 174 175 176 177
type peerFoundMessage struct {
	p peer.ID
}

func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
	p := pfm.p
178
	if _, ok := spm.activePeers[p]; !ok {
hannahhoward's avatar
hannahhoward committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
		spm.activePeers[p] = false
		spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
		spm.tagPeer(p)
	}
}

type peerResponseMessage struct {
	p peer.ID
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {

	p := prm.p
	isOptimized, ok := spm.activePeers[p]
	if !ok {
		spm.activePeers[p] = true
		spm.tagPeer(p)
	} else {
		if isOptimized {
198
			spm.removeOptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
199 200
		} else {
			spm.activePeers[p] = true
201
			spm.removeUnoptimizedPeer(p)
hannahhoward's avatar
hannahhoward committed
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
		}
	}
	spm.insertOptimizedPeer(p)
}

type peerReqMessage struct {
	resp chan<- []peer.ID
}

func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
	randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
	maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
	if maxPeers > maxOptimizedPeers {
		maxPeers = maxOptimizedPeers
	}
217

hannahhoward's avatar
hannahhoward committed
218 219 220
	extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
	for i := range extraPeers {
		extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
221
	}
hannahhoward's avatar
hannahhoward committed
222
	prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
223 224 225 226
}

func (spm *SessionPeerManager) handleShutdown() {
	cmgr := spm.network.ConnectionManager()
hannahhoward's avatar
hannahhoward committed
227
	for p := range spm.activePeers {
228 229 230
		cmgr.UntagPeer(p, spm.tag)
	}
}