peermanager.go 2.31 KB
Newer Older
1 2 3 4 5 6 7 8 9
package peermanager

import (
	"context"
	"sync"

	peer "github.com/libp2p/go-libp2p-peer"
)

10 11
// PeerProcess is any process that provides services for a peer
type PeerProcess interface {
12 13 14 15
	Startup()
	Shutdown()
}

16 17
// PeerProcessFactory provides a function that will create a PeerQueue.
type PeerProcessFactory func(ctx context.Context, p peer.ID) PeerProcess
18

19 20 21
type peerProcessInstance struct {
	refcnt  int
	process PeerProcess
22 23 24 25
}

// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
26 27
	peerProcesses   map[peer.ID]*peerProcessInstance
	peerProcessesLk sync.RWMutex
28

29 30
	createPeerProcess PeerProcessFactory
	ctx               context.Context
31 32 33
}

// New creates a new PeerManager, given a context and a peerQueueFactory.
34
func New(ctx context.Context, createPeerQueue PeerProcessFactory) *PeerManager {
35
	return &PeerManager{
36 37 38
		peerProcesses:     make(map[peer.ID]*peerProcessInstance),
		createPeerProcess: createPeerQueue,
		ctx:               ctx,
39 40 41 42 43
	}
}

// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
44 45 46 47
	pm.peerProcessesLk.RLock()
	defer pm.peerProcessesLk.RUnlock()
	peers := make([]peer.ID, 0, len(pm.peerProcesses))
	for p := range pm.peerProcesses {
48 49 50 51 52 53 54
		peers = append(peers, p)
	}
	return peers
}

// Connected is called to add a new peer to the pool
func (pm *PeerManager) Connected(p peer.ID) {
55
	pm.peerProcessesLk.Lock()
56 57
	pq := pm.getOrCreate(p)
	pq.refcnt++
58
	pm.peerProcessesLk.Unlock()
59 60 61 62
}

// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
63 64
	pm.peerProcessesLk.Lock()
	pq, ok := pm.peerProcesses[p]
65
	if !ok {
66
		pm.peerProcessesLk.Unlock()
67 68 69 70 71
		return
	}

	pq.refcnt--
	if pq.refcnt > 0 {
72
		pm.peerProcessesLk.Unlock()
73 74 75
		return
	}

76 77
	delete(pm.peerProcesses, p)
	pm.peerProcessesLk.Unlock()
78

79
	pq.process.Shutdown()
80 81 82

}

83 84 85 86
// GetProcess returns the process for the given peer
func (pm *PeerManager) GetProcess(
	p peer.ID) PeerProcess {
	pm.peerProcessesLk.Lock()
87
	pqi := pm.getOrCreate(p)
88 89
	pm.peerProcessesLk.Unlock()
	return pqi.process
90 91
}

92 93
func (pm *PeerManager) getOrCreate(p peer.ID) *peerProcessInstance {
	pqi, ok := pm.peerProcesses[p]
94
	if !ok {
95
		pq := pm.createPeerProcess(pm.ctx, p)
96
		pq.Startup()
97 98
		pqi = &peerProcessInstance{0, pq}
		pm.peerProcesses[p] = pqi
99 100 101
	}
	return pqi
}