peermanager.go 3.17 KB
Newer Older
1 2 3 4
package peermanager

import (
	"context"
5
	"sync"
6 7 8 9 10 11 12 13 14 15 16 17 18 19

	bsmsg "github.com/ipfs/go-bitswap/message"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	logging "github.com/ipfs/go-log"

	peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

var (
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

20
// PeerQueue provides a queer of messages to be sent for a single peer.
21 22
type PeerQueue interface {
	AddMessage(entries []*bsmsg.Entry, ses uint64)
23 24
	Startup()
	AddWantlist(initialWants *wantlist.SessionTrackedWantlist)
25 26 27
	Shutdown()
}

28
// PeerQueueFactory provides a function that will create a PeerQueue.
29
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue
30

31 32 33 34
type peerMessage interface {
	handle(pm *PeerManager)
}

hannahhoward's avatar
hannahhoward committed
35 36 37 38 39
type peerQueueInstance struct {
	refcnt int
	pq     PeerQueue
}

40
// PeerManager manages a pool of peers and sends messages to peers in the pool.
41
type PeerManager struct {
42
	// peerQueues -- interact through internal utility functions get/set/remove/iterate
hannahhoward's avatar
hannahhoward committed
43
	peerQueues   map[peer.ID]*peerQueueInstance
44 45
	peerQueuesLk sync.RWMutex

46 47 48 49
	createPeerQueue PeerQueueFactory
	ctx             context.Context
}

50
// New creates a new PeerManager, given a context and a peerQueueFactory.
51 52
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
	return &PeerManager{
hannahhoward's avatar
hannahhoward committed
53
		peerQueues:      make(map[peer.ID]*peerQueueInstance),
54 55 56 57 58
		createPeerQueue: createPeerQueue,
		ctx:             ctx,
	}
}

59
// ConnectedPeers returns a list of peers this PeerManager is managing.
60
func (pm *PeerManager) ConnectedPeers() []peer.ID {
hannahhoward's avatar
hannahhoward committed
61 62
	pm.peerQueuesLk.RLock()
	defer pm.peerQueuesLk.RUnlock()
63
	peers := make([]peer.ID, 0, len(pm.peerQueues))
hannahhoward's avatar
hannahhoward committed
64
	for p := range pm.peerQueues {
65
		peers = append(peers, p)
hannahhoward's avatar
hannahhoward committed
66
	}
67
	return peers
68 69
}

70 71
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
72
func (pm *PeerManager) Connected(p peer.ID, initialWants *wantlist.SessionTrackedWantlist) {
hannahhoward's avatar
hannahhoward committed
73 74 75
	pm.peerQueuesLk.Lock()

	pq := pm.getOrCreate(p)
76

hannahhoward's avatar
hannahhoward committed
77
	if pq.refcnt == 0 {
78
		pq.pq.AddWantlist(initialWants)
79
	}
hannahhoward's avatar
hannahhoward committed
80 81 82 83

	pq.refcnt++

	pm.peerQueuesLk.Unlock()
84 85
}

86 87
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
88 89
	pm.peerQueuesLk.Lock()
	pq, ok := pm.peerQueues[p]
90

hannahhoward's avatar
hannahhoward committed
91 92 93 94 95 96 97
	if !ok {
		pm.peerQueuesLk.Unlock()
		return
	}

	pq.refcnt--
	if pq.refcnt > 0 {
98
		pm.peerQueuesLk.Unlock()
99 100 101
		return
	}

102 103
	delete(pm.peerQueues, p)
	pm.peerQueuesLk.Unlock()
104

hannahhoward's avatar
hannahhoward committed
105
	pq.pq.Shutdown()
106

107 108
}

109 110
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
111
func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
112
	if len(targets) == 0 {
hannahhoward's avatar
hannahhoward committed
113 114
		pm.peerQueuesLk.RLock()
		for _, p := range pm.peerQueues {
hannahhoward's avatar
hannahhoward committed
115
			p.pq.AddMessage(entries, from)
hannahhoward's avatar
hannahhoward committed
116 117
		}
		pm.peerQueuesLk.RUnlock()
118
	} else {
119
		for _, t := range targets {
hannahhoward's avatar
hannahhoward committed
120 121 122 123
			pm.peerQueuesLk.Lock()
			pqi := pm.getOrCreate(t)
			pm.peerQueuesLk.Unlock()
			pqi.pq.AddMessage(entries, from)
124 125 126
		}
	}
}
127

hannahhoward's avatar
hannahhoward committed
128 129
func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance {
	pqi, ok := pm.peerQueues[p]
130
	if !ok {
131 132
		pq := pm.createPeerQueue(pm.ctx, p)
		pq.Startup()
hannahhoward's avatar
hannahhoward committed
133 134
		pqi = &peerQueueInstance{0, pq}
		pm.peerQueues[p] = pqi
135
	}
hannahhoward's avatar
hannahhoward committed
136
	return pqi
137
}