sessionpeermanager.go 3.09 KB
Newer Older
1 2 3 4
package sessionpeermanager

import (
	"fmt"
5
	"sync"
6

7
	lu "github.com/ipfs/go-bitswap/internal/logutil"
dirkmc's avatar
dirkmc committed
8
	logging "github.com/ipfs/go-log"
9

Raúl Kripalani's avatar
Raúl Kripalani committed
10
	peer "github.com/libp2p/go-libp2p-core/peer"
11 12
)

dirkmc's avatar
dirkmc committed
13 14
var log = logging.Logger("bs:sprmgr")

hannahhoward's avatar
hannahhoward committed
15
const (
16 17 18
	// Connection Manager tag value for session peers. Indicates to connection
	// manager that it should keep the connection to the peer.
	sessionPeerTagValue = 5
hannahhoward's avatar
hannahhoward committed
19 20
)

21 22 23 24 25 26
// PeerTagger is an interface for tagging peers with metadata
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

27 28
// SessionPeerManager keeps track of peers for a session, and takes care of
// ConnectionManager tagging.
29
type SessionPeerManager struct {
30 31
	tagger PeerTagger
	tag    string
32

33 34 35
	plk             sync.RWMutex
	peers           map[peer.ID]struct{}
	peersDiscovered bool
36 37
}

38
// New creates a new SessionPeerManager
39 40 41 42 43
func New(id uint64, tagger PeerTagger) *SessionPeerManager {
	return &SessionPeerManager{
		tag:    fmt.Sprint("bs-ses-", id),
		tagger: tagger,
		peers:  make(map[peer.ID]struct{}),
44
	}
hannahhoward's avatar
hannahhoward committed
45 46
}

47 48 49 50 51
// AddPeer adds the peer to the SessionPeerManager.
// Returns true if the peer is a new peer, false if it already existed.
func (spm *SessionPeerManager) AddPeer(p peer.ID) bool {
	spm.plk.Lock()
	defer spm.plk.Unlock()
dirkmc's avatar
dirkmc committed
52

53 54 55
	// Check if the peer is a new peer
	if _, ok := spm.peers[p]; ok {
		return false
dirkmc's avatar
dirkmc committed
56
	}
hannahhoward's avatar
hannahhoward committed
57

58 59
	spm.peers[p] = struct{}{}
	spm.peersDiscovered = true
60

61 62 63
	// Tag the peer with the ConnectionManager so it doesn't discard the
	// connection
	spm.tagger.TagPeer(p, spm.tag, sessionPeerTagValue)
64

65
	log.Debugf("Added peer %s to session (%d peers)\n", p, len(spm.peers))
66
	return true
67 68
}

69 70 71 72 73
// RemovePeer removes the peer from the SessionPeerManager.
// Returns true if the peer was removed, false if it did not exist.
func (spm *SessionPeerManager) RemovePeer(p peer.ID) bool {
	spm.plk.Lock()
	defer spm.plk.Unlock()
hannahhoward's avatar
hannahhoward committed
74

75 76
	if _, ok := spm.peers[p]; !ok {
		return false
hannahhoward's avatar
hannahhoward committed
77 78
	}

79 80
	delete(spm.peers, p)
	spm.tagger.UntagPeer(p, spm.tag)
81 82

	log.Debugf("Removed peer %s from session (%d peers)", lu.P(p), len(spm.peers))
83
	return true
hannahhoward's avatar
hannahhoward committed
84 85
}

86 87 88 89 90 91
// PeersDiscovered indicates whether peers have been discovered yet.
// Returns true once a peer has been discovered by the session (even if all
// peers are later removed from the session).
func (spm *SessionPeerManager) PeersDiscovered() bool {
	spm.plk.RLock()
	defer spm.plk.RUnlock()
92

93
	return spm.peersDiscovered
94 95
}

96 97 98
func (spm *SessionPeerManager) Peers() []peer.ID {
	spm.plk.RLock()
	defer spm.plk.RUnlock()
99

100 101 102
	peers := make([]peer.ID, 0, len(spm.peers))
	for p := range spm.peers {
		peers = append(peers, p)
103
	}
hannahhoward's avatar
hannahhoward committed
104

105
	return peers
hannahhoward's avatar
hannahhoward committed
106 107
}

108 109 110
func (spm *SessionPeerManager) HasPeers() bool {
	spm.plk.RLock()
	defer spm.plk.RUnlock()
111

112
	return len(spm.peers) > 0
113 114
}

Dirk McCormick's avatar
Dirk McCormick committed
115 116 117 118 119 120 121 122
func (spm *SessionPeerManager) HasPeer(p peer.ID) bool {
	spm.plk.RLock()
	defer spm.plk.RUnlock()

	_, ok := spm.peers[p]
	return ok
}

123 124 125 126
// Shutdown untags all the peers
func (spm *SessionPeerManager) Shutdown() {
	spm.plk.Lock()
	defer spm.plk.Unlock()
127

128 129 130
	// Untag the peers with the ConnectionManager so that it can release
	// connections to those peers
	for p := range spm.peers {
131
		spm.tagger.UntagPeer(p, spm.tag)
132 133
	}
}