peermanager.go 4.57 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
package peermanager

import (
	"context"

	bsmsg "github.com/ipfs/go-bitswap/message"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	logging "github.com/ipfs/go-log"

	peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

var (
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

19
// PeerQueue provides a queer of messages to be sent for a single peer.
20 21 22 23 24 25 26 27
type PeerQueue interface {
	RefIncrement()
	RefDecrement() bool
	AddMessage(entries []*bsmsg.Entry, ses uint64)
	Startup(ctx context.Context, initialEntries []*wantlist.Entry)
	Shutdown()
}

28
// PeerQueueFactory provides a function that will create a PeerQueue.
29 30
type PeerQueueFactory func(p peer.ID) PeerQueue

31 32 33 34
type peerMessage interface {
	handle(pm *PeerManager)
}

35
// PeerManager manages a pool of peers and sends messages to peers in the pool.
36 37 38 39 40 41 42 43 44 45 46 47
type PeerManager struct {
	// sync channel for Run loop
	peerMessages chan peerMessage

	// synchronized by Run loop, only touch inside there
	peerQueues map[peer.ID]PeerQueue

	createPeerQueue PeerQueueFactory
	ctx             context.Context
	cancel          func()
}

48
// New creates a new PeerManager, given a context and a peerQueueFactory.
49 50 51 52 53 54 55 56 57 58 59
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
	ctx, cancel := context.WithCancel(ctx)
	return &PeerManager{
		peerMessages:    make(chan peerMessage, 10),
		peerQueues:      make(map[peer.ID]PeerQueue),
		createPeerQueue: createPeerQueue,
		ctx:             ctx,
		cancel:          cancel,
	}
}

60
// ConnectedPeers returns a list of peers this PeerManager is managing.
61
func (pm *PeerManager) ConnectedPeers() []peer.ID {
62 63 64
	resp := make(chan []peer.ID)
	pm.peerMessages <- &getPeersMessage{resp}
	return <-resp
65 66
}

67
// Connected is called to add a new peer to the pool, and send it an initial set
68
// of wants.
69 70
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
	select {
71
	case pm.peerMessages <- &connectPeerMessage{p, initialEntries}:
72 73 74 75
	case <-pm.ctx.Done():
	}
}

76
// Disconnected is called to remove a peer from the pool.
77 78
func (pm *PeerManager) Disconnected(p peer.ID) {
	select {
79
	case pm.peerMessages <- &disconnectPeerMessage{p}:
80 81 82 83
	case <-pm.ctx.Done():
	}
}

84 85
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
86 87
func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
	select {
88
	case pm.peerMessages <- &sendPeerMessage{entries: entries, targets: targets, from: from}:
89 90 91 92
	case <-pm.ctx.Done():
	}
}

93
// Startup enables the run loop for the PeerManager - no processing will occur
94
// if startup is not called.
95 96 97 98
func (pm *PeerManager) Startup() {
	go pm.run()
}

99
// Shutdown shutsdown processing for the PeerManager.
100 101 102 103 104 105 106 107
func (pm *PeerManager) Shutdown() {
	pm.cancel()
}

func (pm *PeerManager) run() {
	for {
		select {
		case message := <-pm.peerMessages:
108
			message.handle(pm)
109 110 111 112 113 114
		case <-pm.ctx.Done():
			return
		}
	}
}

115 116 117 118 119
type sendPeerMessage struct {
	entries []*bsmsg.Entry
	targets []peer.ID
	from    uint64
}
120

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
func (s *sendPeerMessage) handle(pm *PeerManager) {
	pm.sendMessage(s)
}

type connectPeerMessage struct {
	p              peer.ID
	initialEntries []*wantlist.Entry
}

func (c *connectPeerMessage) handle(pm *PeerManager) {
	pm.startPeerHandler(c.p, c.initialEntries)
}

type disconnectPeerMessage struct {
	p peer.ID
}

func (dc *disconnectPeerMessage) handle(pm *PeerManager) {
	pm.stopPeerHandler(dc.p)
}

type getPeersMessage struct {
	peerResp chan<- []peer.ID
}

func (gp *getPeersMessage) handle(pm *PeerManager) {
	pm.getPeers(gp.peerResp)
}

func (pm *PeerManager) getPeers(peerResp chan<- []peer.ID) {
	peers := make([]peer.ID, 0, len(pm.peerQueues))
	for p := range pm.peerQueues {
		peers = append(peers, p)
	}
	peerResp <- peers
}

func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue {
	mq, ok := pm.peerQueues[p]
	if ok {
		mq.RefIncrement()
		return nil
	}

	mq = pm.createPeerQueue(p)
	pm.peerQueues[p] = mq
	mq.Startup(pm.ctx, initialEntries)
	return mq
}

func (pm *PeerManager) stopPeerHandler(p peer.ID) {
	pq, ok := pm.peerQueues[p]
	if !ok {
		// TODO: log error?
		return
	}

	if pq.RefDecrement() {
		return
	}

	pq.Shutdown()
	delete(pm.peerQueues, p)
}

func (pm *PeerManager) sendMessage(ms *sendPeerMessage) {
	if len(ms.targets) == 0 {
		for _, p := range pm.peerQueues {
			p.AddMessage(ms.entries, ms.from)
190
		}
191 192 193 194 195 196 197 198
	} else {
		for _, t := range ms.targets {
			p, ok := pm.peerQueues[t]
			if !ok {
				log.Infof("tried sending wantlist change to non-partner peer: %s", t)
				continue
			}
			p.AddMessage(ms.entries, ms.from)
199 200 201
		}
	}
}