sessionpeermanager.go 3.52 KB
Newer Older
1 2 3 4
package sessionpeermanager

import (
	"fmt"
5
	"sync"
6

7
	logging "gitlab.dms3.io/dms3/go-log"
8

9
	peer "gitlab.dms3.io/p2p/go-p2p-core/peer"
10 11
)

dirkmc's avatar
dirkmc committed
12 13
var log = logging.Logger("bs:sprmgr")

hannahhoward's avatar
hannahhoward committed
14
const (
15 16 17
	// Connection Manager tag value for session peers. Indicates to connection
	// manager that it should keep the connection to the peer.
	sessionPeerTagValue = 5
hannahhoward's avatar
hannahhoward committed
18 19
)

20 21 22 23
// PeerTagger is an interface for tagging peers with metadata
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
24 25
	Protect(peer.ID, string)
	Unprotect(peer.ID, string) bool
26 27
}

28 29
// SessionPeerManager keeps track of peers for a session, and takes care of
// ConnectionManager tagging.
30
type SessionPeerManager struct {
31 32
	tagger PeerTagger
	tag    string
33

Dirk McCormick's avatar
Dirk McCormick committed
34
	id              uint64
35 36 37
	plk             sync.RWMutex
	peers           map[peer.ID]struct{}
	peersDiscovered bool
38 39
}

40
// New creates a new SessionPeerManager
41 42
func New(id uint64, tagger PeerTagger) *SessionPeerManager {
	return &SessionPeerManager{
Dirk McCormick's avatar
Dirk McCormick committed
43
		id:     id,
44 45 46
		tag:    fmt.Sprint("bs-ses-", id),
		tagger: tagger,
		peers:  make(map[peer.ID]struct{}),
47
	}
hannahhoward's avatar
hannahhoward committed
48 49
}

50 51 52 53 54
// AddPeer adds the peer to the SessionPeerManager.
// Returns true if the peer is a new peer, false if it already existed.
func (spm *SessionPeerManager) AddPeer(p peer.ID) bool {
	spm.plk.Lock()
	defer spm.plk.Unlock()
dirkmc's avatar
dirkmc committed
55

56 57 58
	// Check if the peer is a new peer
	if _, ok := spm.peers[p]; ok {
		return false
dirkmc's avatar
dirkmc committed
59
	}
hannahhoward's avatar
hannahhoward committed
60

61 62
	spm.peers[p] = struct{}{}
	spm.peersDiscovered = true
63

64 65 66
	// Tag the peer with the ConnectionManager so it doesn't discard the
	// connection
	spm.tagger.TagPeer(p, spm.tag, sessionPeerTagValue)
67

Dirk McCormick's avatar
Dirk McCormick committed
68
	log.Debugw("Bitswap: Added peer to session", "session", spm.id, "peer", p, "peerCount", len(spm.peers))
69
	return true
70 71
}

72 73 74 75 76 77 78 79 80
// Protect connection to this peer from being pruned by the connection manager
func (spm *SessionPeerManager) ProtectConnection(p peer.ID) {
	spm.plk.Lock()
	defer spm.plk.Unlock()

	if _, ok := spm.peers[p]; !ok {
		return
	}

81
	spm.tagger.Protect(p, spm.tag)
82 83
}

84 85 86 87 88
// RemovePeer removes the peer from the SessionPeerManager.
// Returns true if the peer was removed, false if it did not exist.
func (spm *SessionPeerManager) RemovePeer(p peer.ID) bool {
	spm.plk.Lock()
	defer spm.plk.Unlock()
hannahhoward's avatar
hannahhoward committed
89

90 91
	if _, ok := spm.peers[p]; !ok {
		return false
hannahhoward's avatar
hannahhoward committed
92 93
	}

94 95
	delete(spm.peers, p)
	spm.tagger.UntagPeer(p, spm.tag)
96
	spm.tagger.Unprotect(p, spm.tag)
97

Dirk McCormick's avatar
Dirk McCormick committed
98
	log.Debugw("Bitswap: removed peer from session", "session", spm.id, "peer", p, "peerCount", len(spm.peers))
99
	return true
hannahhoward's avatar
hannahhoward committed
100 101
}

102 103 104 105 106 107
// PeersDiscovered indicates whether peers have been discovered yet.
// Returns true once a peer has been discovered by the session (even if all
// peers are later removed from the session).
func (spm *SessionPeerManager) PeersDiscovered() bool {
	spm.plk.RLock()
	defer spm.plk.RUnlock()
108

109
	return spm.peersDiscovered
110 111
}

112 113 114
func (spm *SessionPeerManager) Peers() []peer.ID {
	spm.plk.RLock()
	defer spm.plk.RUnlock()
115

116 117 118
	peers := make([]peer.ID, 0, len(spm.peers))
	for p := range spm.peers {
		peers = append(peers, p)
119
	}
hannahhoward's avatar
hannahhoward committed
120

121
	return peers
hannahhoward's avatar
hannahhoward committed
122 123
}

124 125 126
func (spm *SessionPeerManager) HasPeers() bool {
	spm.plk.RLock()
	defer spm.plk.RUnlock()
127

128
	return len(spm.peers) > 0
129 130
}

Dirk McCormick's avatar
Dirk McCormick committed
131 132 133 134 135 136 137 138
func (spm *SessionPeerManager) HasPeer(p peer.ID) bool {
	spm.plk.RLock()
	defer spm.plk.RUnlock()

	_, ok := spm.peers[p]
	return ok
}

139 140 141 142
// Shutdown untags all the peers
func (spm *SessionPeerManager) Shutdown() {
	spm.plk.Lock()
	defer spm.plk.Unlock()
143

144 145 146
	// Untag the peers with the ConnectionManager so that it can release
	// connections to those peers
	for p := range spm.peers {
147
		spm.tagger.UntagPeer(p, spm.tag)
148
		spm.tagger.Unprotect(p, spm.tag)
149 150
	}
}