peermanager.go 6.53 KB
Newer Older
1 2 3 4
package peermanager

import (
	"context"
dirkmc's avatar
dirkmc committed
5
	"sync"
6

dirkmc's avatar
dirkmc committed
7
	"github.com/ipfs/go-metrics-interface"
8

dirkmc's avatar
dirkmc committed
9
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
10
	peer "github.com/libp2p/go-libp2p-core/peer"
11 12
)

ZenGround0's avatar
ZenGround0 committed
13
// PeerQueue provides a queue of messages to be sent for a single peer.
14
type PeerQueue interface {
dirkmc's avatar
dirkmc committed
15 16 17
	AddBroadcastWantHaves([]cid.Cid)
	AddWants([]cid.Cid, []cid.Cid)
	AddCancels([]cid.Cid)
18
	Startup()
19 20 21
	Shutdown()
}

dirkmc's avatar
dirkmc committed
22 23 24 25 26
type Session interface {
	ID() uint64
	SignalAvailability(peer.ID, bool)
}

27
// PeerQueueFactory provides a function that will create a PeerQueue.
28
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue
29

hannahhoward's avatar
hannahhoward committed
30 31 32 33 34
type peerQueueInstance struct {
	refcnt int
	pq     PeerQueue
}

35
// PeerManager manages a pool of peers and sends messages to peers in the pool.
36
type PeerManager struct {
dirkmc's avatar
dirkmc committed
37 38
	// sync access to peerQueues and peerWantManager
	pqLk sync.RWMutex
39
	// peerQueues -- interact through internal utility functions get/set/remove/iterate
40
	peerQueues map[peer.ID]*peerQueueInstance
dirkmc's avatar
dirkmc committed
41
	pwm        *peerWantManager
42

43 44
	createPeerQueue PeerQueueFactory
	ctx             context.Context
dirkmc's avatar
dirkmc committed
45 46 47 48 49 50

	psLk         sync.RWMutex
	sessions     map[uint64]Session
	peerSessions map[peer.ID]map[uint64]struct{}

	self peer.ID
51 52
}

53
// New creates a new PeerManager, given a context and a peerQueueFactory.
dirkmc's avatar
dirkmc committed
54 55
func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager {
	wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
56
	return &PeerManager{
hannahhoward's avatar
hannahhoward committed
57
		peerQueues:      make(map[peer.ID]*peerQueueInstance),
dirkmc's avatar
dirkmc committed
58
		pwm:             newPeerWantManager(wantGauge),
59 60
		createPeerQueue: createPeerQueue,
		ctx:             ctx,
dirkmc's avatar
dirkmc committed
61 62 63 64
		self:            self,

		sessions:     make(map[uint64]Session),
		peerSessions: make(map[peer.ID]map[uint64]struct{}),
65 66 67
	}
}

dirkmc's avatar
dirkmc committed
68 69 70 71 72
func (pm *PeerManager) AvailablePeers() []peer.ID {
	// TODO: Rate-limit peers
	return pm.ConnectedPeers()
}

73
// ConnectedPeers returns a list of peers this PeerManager is managing.
74
func (pm *PeerManager) ConnectedPeers() []peer.ID {
dirkmc's avatar
dirkmc committed
75 76 77
	pm.pqLk.RLock()
	defer pm.pqLk.RUnlock()

78
	peers := make([]peer.ID, 0, len(pm.peerQueues))
hannahhoward's avatar
hannahhoward committed
79
	for p := range pm.peerQueues {
80
		peers = append(peers, p)
hannahhoward's avatar
hannahhoward committed
81
	}
82
	return peers
83 84
}

85 86
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
dirkmc's avatar
dirkmc committed
87 88 89 90
func (pm *PeerManager) Connected(p peer.ID, initialWantHaves []cid.Cid) {
	pm.pqLk.Lock()
	defer pm.pqLk.Unlock()

hannahhoward's avatar
hannahhoward committed
91
	pq := pm.getOrCreate(p)
dirkmc's avatar
dirkmc committed
92
	pq.refcnt++
93

dirkmc's avatar
dirkmc committed
94 95 96
	// If this is the first connection to the peer
	if pq.refcnt == 1 {
		// Inform the peer want manager that there's a new peer
97
		pm.pwm.addPeer(p)
dirkmc's avatar
dirkmc committed
98
		// Record that the want-haves are being sent to the peer
99
		_, wantHaves := pm.pwm.prepareSendWants(p, nil, initialWantHaves)
dirkmc's avatar
dirkmc committed
100
		// Broadcast any live want-haves to the newly connected peers
101
		pq.pq.AddBroadcastWantHaves(wantHaves)
dirkmc's avatar
dirkmc committed
102 103
		// Inform the sessions that the peer has connected
		pm.signalAvailability(p, true)
104 105 106
	}
}

107 108
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
dirkmc's avatar
dirkmc committed
109 110 111
	pm.pqLk.Lock()
	defer pm.pqLk.Unlock()

112
	pq, ok := pm.peerQueues[p]
113

hannahhoward's avatar
hannahhoward committed
114 115 116 117 118 119
	if !ok {
		return
	}

	pq.refcnt--
	if pq.refcnt > 0 {
120 121 122
		return
	}

dirkmc's avatar
dirkmc committed
123 124 125 126
	// Inform the sessions that the peer has disconnected
	pm.signalAvailability(p, false)

	// Clean up the peer
127
	delete(pm.peerQueues, p)
hannahhoward's avatar
hannahhoward committed
128
	pq.pq.Shutdown()
129
	pm.pwm.removePeer(p)
130 131
}

132 133 134 135
// BroadcastWantHaves broadcasts want-haves to all peers (used by the session
// to discover seeds).
// For each peer it filters out want-haves that have previously been sent to
// the peer.
dirkmc's avatar
dirkmc committed
136 137 138 139
func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.Cid) {
	pm.pqLk.Lock()
	defer pm.pqLk.Unlock()

140
	for p, ks := range pm.pwm.prepareBroadcastWantHaves(wantHaves) {
dirkmc's avatar
dirkmc committed
141 142
		if pqi, ok := pm.peerQueues[p]; ok {
			pqi.pq.AddBroadcastWantHaves(ks)
hannahhoward's avatar
hannahhoward committed
143
		}
dirkmc's avatar
dirkmc committed
144 145 146
	}
}

147 148
// SendWants sends the given want-blocks and want-haves to the given peer.
// It filters out wants that have previously been sent to the peer.
dirkmc's avatar
dirkmc committed
149 150 151 152 153
func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	pm.pqLk.Lock()
	defer pm.pqLk.Unlock()

	if pqi, ok := pm.peerQueues[p]; ok {
154
		wblks, whvs := pm.pwm.prepareSendWants(p, wantBlocks, wantHaves)
dirkmc's avatar
dirkmc committed
155 156 157 158
		pqi.pq.AddWants(wblks, whvs)
	}
}

159 160
// SendCancels sends cancels for the given keys to all peers who had previously
// received a want for those keys.
dirkmc's avatar
dirkmc committed
161 162 163 164 165
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
	pm.pqLk.Lock()
	defer pm.pqLk.Unlock()

	// Send a CANCEL to each peer that has been sent a want-block or want-have
166
	for p, ks := range pm.pwm.prepareSendCancels(cancelKs) {
dirkmc's avatar
dirkmc committed
167 168
		if pqi, ok := pm.peerQueues[p]; ok {
			pqi.pq.AddCancels(ks)
169 170 171
		}
	}
}
172

173
// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
dirkmc's avatar
dirkmc committed
174 175 176 177
func (pm *PeerManager) CurrentWants() []cid.Cid {
	pm.pqLk.RLock()
	defer pm.pqLk.RUnlock()

178
	return pm.pwm.getWants()
179 180 181 182 183 184 185
}

// CurrentWantBlocks returns the list of pending want-blocks
func (pm *PeerManager) CurrentWantBlocks() []cid.Cid {
	pm.pqLk.RLock()
	defer pm.pqLk.RUnlock()

186
	return pm.pwm.getWantBlocks()
dirkmc's avatar
dirkmc committed
187 188
}

189
// CurrentWantHaves returns the list of pending want-haves
dirkmc's avatar
dirkmc committed
190 191 192 193
func (pm *PeerManager) CurrentWantHaves() []cid.Cid {
	pm.pqLk.RLock()
	defer pm.pqLk.RUnlock()

194
	return pm.pwm.getWantHaves()
dirkmc's avatar
dirkmc committed
195 196
}

hannahhoward's avatar
hannahhoward committed
197 198
func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance {
	pqi, ok := pm.peerQueues[p]
199
	if !ok {
200 201
		pq := pm.createPeerQueue(pm.ctx, p)
		pq.Startup()
hannahhoward's avatar
hannahhoward committed
202 203
		pqi = &peerQueueInstance{0, pq}
		pm.peerQueues[p] = pqi
204
	}
hannahhoward's avatar
hannahhoward committed
205
	return pqi
206
}
dirkmc's avatar
dirkmc committed
207

208 209
// RegisterSession tells the PeerManager that the given session is interested
// in events about the given peer.
dirkmc's avatar
dirkmc committed
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
func (pm *PeerManager) RegisterSession(p peer.ID, s Session) bool {
	pm.psLk.Lock()
	defer pm.psLk.Unlock()

	if _, ok := pm.sessions[s.ID()]; !ok {
		pm.sessions[s.ID()] = s
	}

	if _, ok := pm.peerSessions[p]; !ok {
		pm.peerSessions[p] = make(map[uint64]struct{})
	}
	pm.peerSessions[p][s.ID()] = struct{}{}

	_, ok := pm.peerQueues[p]
	return ok
}

227 228
// UnregisterSession tells the PeerManager that the given session is no longer
// interested in PeerManager events.
dirkmc's avatar
dirkmc committed
229 230 231 232 233 234 235 236 237 238 239 240 241 242
func (pm *PeerManager) UnregisterSession(ses uint64) {
	pm.psLk.Lock()
	defer pm.psLk.Unlock()

	for p := range pm.peerSessions {
		delete(pm.peerSessions[p], ses)
		if len(pm.peerSessions[p]) == 0 {
			delete(pm.peerSessions, p)
		}
	}

	delete(pm.sessions, ses)
}

243 244
// signalAvailability is called when a peer's connectivity changes.
// It informs interested sessions.
dirkmc's avatar
dirkmc committed
245
func (pm *PeerManager) signalAvailability(p peer.ID, isConnected bool) {
246 247 248 249 250 251 252
	sesIds, ok := pm.peerSessions[p]
	if !ok {
		return
	}
	for sesId := range sesIds {
		if s, ok := pm.sessions[sesId]; ok {
			s.SignalAvailability(p, isConnected)
dirkmc's avatar
dirkmc committed
253 254 255
		}
	}
}