peermanager.go 4.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
package peermanager

import (
	"context"

	bsmsg "github.com/ipfs/go-bitswap/message"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	logging "github.com/ipfs/go-log"

	peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

var (
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

19
// PeerQueue provides a queer of messages to be sent for a single peer.
20 21 22 23 24 25 26 27
type PeerQueue interface {
	RefIncrement()
	RefDecrement() bool
	AddMessage(entries []*bsmsg.Entry, ses uint64)
	Startup(ctx context.Context, initialEntries []*wantlist.Entry)
	Shutdown()
}

28
// PeerQueueFactory provides a function that will create a PeerQueue.
29 30
type PeerQueueFactory func(p peer.ID) PeerQueue

31 32 33 34
type peerMessage interface {
	handle(pm *PeerManager)
}

35
// PeerManager manages a pool of peers and sends messages to peers in the pool.
36 37 38 39 40 41 42 43 44 45 46 47
type PeerManager struct {
	// sync channel for Run loop
	peerMessages chan peerMessage

	// synchronized by Run loop, only touch inside there
	peerQueues map[peer.ID]PeerQueue

	createPeerQueue PeerQueueFactory
	ctx             context.Context
	cancel          func()
}

48
// New creates a new PeerManager, given a context and a peerQueueFactory.
49 50 51 52 53 54 55 56 57 58 59
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
	ctx, cancel := context.WithCancel(ctx)
	return &PeerManager{
		peerMessages:    make(chan peerMessage, 10),
		peerQueues:      make(map[peer.ID]PeerQueue),
		createPeerQueue: createPeerQueue,
		ctx:             ctx,
		cancel:          cancel,
	}
}

60
// ConnectedPeers returns a list of peers this PeerManager is managing.
61
func (pm *PeerManager) ConnectedPeers() []peer.ID {
62 63 64 65 66 67 68 69 70 71 72 73
	resp := make(chan []peer.ID, 1)
	select {
	case pm.peerMessages <- &getPeersMessage{resp}:
	case <-pm.ctx.Done():
		return nil
	}
	select {
	case peers := <-resp:
		return peers
	case <-pm.ctx.Done():
		return nil
	}
74 75
}

76
// Connected is called to add a new peer to the pool, and send it an initial set
77
// of wants.
78 79
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
	select {
80
	case pm.peerMessages <- &connectPeerMessage{p, initialEntries}:
81 82 83 84
	case <-pm.ctx.Done():
	}
}

85
// Disconnected is called to remove a peer from the pool.
86 87
func (pm *PeerManager) Disconnected(p peer.ID) {
	select {
88
	case pm.peerMessages <- &disconnectPeerMessage{p}:
89 90 91 92
	case <-pm.ctx.Done():
	}
}

93 94
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
95 96
func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
	select {
97
	case pm.peerMessages <- &sendPeerMessage{entries: entries, targets: targets, from: from}:
98 99 100 101
	case <-pm.ctx.Done():
	}
}

102
// Startup enables the run loop for the PeerManager - no processing will occur
103
// if startup is not called.
104 105 106 107
func (pm *PeerManager) Startup() {
	go pm.run()
}

108
// Shutdown shutsdown processing for the PeerManager.
109 110 111 112 113 114 115 116
func (pm *PeerManager) Shutdown() {
	pm.cancel()
}

func (pm *PeerManager) run() {
	for {
		select {
		case message := <-pm.peerMessages:
117
			message.handle(pm)
118 119 120 121 122 123
		case <-pm.ctx.Done():
			return
		}
	}
}

124 125 126 127 128
type sendPeerMessage struct {
	entries []*bsmsg.Entry
	targets []peer.ID
	from    uint64
}
129

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
func (s *sendPeerMessage) handle(pm *PeerManager) {
	pm.sendMessage(s)
}

type connectPeerMessage struct {
	p              peer.ID
	initialEntries []*wantlist.Entry
}

func (c *connectPeerMessage) handle(pm *PeerManager) {
	pm.startPeerHandler(c.p, c.initialEntries)
}

type disconnectPeerMessage struct {
	p peer.ID
}

func (dc *disconnectPeerMessage) handle(pm *PeerManager) {
	pm.stopPeerHandler(dc.p)
}

type getPeersMessage struct {
	peerResp chan<- []peer.ID
}

func (gp *getPeersMessage) handle(pm *PeerManager) {
	pm.getPeers(gp.peerResp)
}

func (pm *PeerManager) getPeers(peerResp chan<- []peer.ID) {
	peers := make([]peer.ID, 0, len(pm.peerQueues))
	for p := range pm.peerQueues {
		peers = append(peers, p)
	}
	peerResp <- peers
}

func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue {
	mq, ok := pm.peerQueues[p]
	if ok {
		mq.RefIncrement()
		return nil
	}

	mq = pm.createPeerQueue(p)
	pm.peerQueues[p] = mq
	mq.Startup(pm.ctx, initialEntries)
	return mq
}

func (pm *PeerManager) stopPeerHandler(p peer.ID) {
	pq, ok := pm.peerQueues[p]
	if !ok {
		// TODO: log error?
		return
	}

	if pq.RefDecrement() {
		return
	}

	pq.Shutdown()
	delete(pm.peerQueues, p)
}

func (pm *PeerManager) sendMessage(ms *sendPeerMessage) {
	if len(ms.targets) == 0 {
		for _, p := range pm.peerQueues {
			p.AddMessage(ms.entries, ms.from)
199
		}
200 201 202 203 204 205 206 207
	} else {
		for _, t := range ms.targets {
			p, ok := pm.peerQueues[t]
			if !ok {
				log.Infof("tried sending wantlist change to non-partner peer: %s", t)
				continue
			}
			p.AddMessage(ms.entries, ms.from)
208 209 210
		}
	}
}