peermanager.go 4.09 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
package peermanager

import (
	"context"

	bsmsg "github.com/ipfs/go-bitswap/message"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	logging "github.com/ipfs/go-log"

	peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

var (
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

type sendMessageParams struct {
	entries []*bsmsg.Entry
	targets []peer.ID
	from    uint64
}

type connectParams struct {
	peer           peer.ID
	initialEntries []*wantlist.Entry
}

type peerMessageType int

const (
	connect peerMessageType = iota + 1
	disconnect
	getPeers
	sendMessage
)

type peerMessage struct {
	messageType peerMessageType
	params      interface{}
	resultsChan chan interface{}
}

type PeerQueue interface {
	RefIncrement()
	RefDecrement() bool
	AddMessage(entries []*bsmsg.Entry, ses uint64)
	Startup(ctx context.Context, initialEntries []*wantlist.Entry)
	Shutdown()
}

type PeerQueueFactory func(p peer.ID) PeerQueue

type PeerManager struct {
	// sync channel for Run loop
	peerMessages chan peerMessage

	// synchronized by Run loop, only touch inside there
	peerQueues map[peer.ID]PeerQueue

	createPeerQueue PeerQueueFactory
	ctx             context.Context
	cancel          func()
}

func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
	ctx, cancel := context.WithCancel(ctx)
	return &PeerManager{
		peerMessages:    make(chan peerMessage, 10),
		peerQueues:      make(map[peer.ID]PeerQueue),
		createPeerQueue: createPeerQueue,
		ctx:             ctx,
		cancel:          cancel,
	}
}

func (pm *PeerManager) ConnectedPeers() []peer.ID {
	resp := make(chan interface{})
	pm.peerMessages <- peerMessage{getPeers, nil, resp}
	peers := <-resp
	return peers.([]peer.ID)
}

func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue {
	mq, ok := pm.peerQueues[p]
	if ok {
		mq.RefIncrement()
		return nil
	}

	mq = pm.createPeerQueue(p)
	pm.peerQueues[p] = mq
	mq.Startup(pm.ctx, initialEntries)
	return mq
}

func (pm *PeerManager) stopPeerHandler(p peer.ID) {
	pq, ok := pm.peerQueues[p]
	if !ok {
		// TODO: log error?
		return
	}

	if pq.RefDecrement() {
		return
	}

	pq.Shutdown()
	delete(pm.peerQueues, p)
}

func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
	select {
	case pm.peerMessages <- peerMessage{connect, connectParams{peer: p, initialEntries: initialEntries}, nil}:
	case <-pm.ctx.Done():
	}
}

func (pm *PeerManager) Disconnected(p peer.ID) {
	select {
	case pm.peerMessages <- peerMessage{disconnect, p, nil}:
	case <-pm.ctx.Done():
	}
}

func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
	select {
	case pm.peerMessages <- peerMessage{
		sendMessage,
		&sendMessageParams{entries: entries, targets: targets, from: from},
		nil,
	}:
	case <-pm.ctx.Done():
	}
}

func (pm *PeerManager) Startup() {
	go pm.run()
}

func (pm *PeerManager) Shutdown() {
	pm.cancel()
}

// TODO: use goprocess here once i trust it
func (pm *PeerManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	for {
		select {
		case message := <-pm.peerMessages:
			pm.handleMessage(message)
		case <-pm.ctx.Done():
			return
		}
	}
}

func (pm *PeerManager) handleMessage(message peerMessage) {

	switch message.messageType {
	case sendMessage:
		ms := message.params.(*sendMessageParams)
		if len(ms.targets) == 0 {
			for _, p := range pm.peerQueues {
				p.AddMessage(ms.entries, ms.from)
			}
		} else {
			for _, t := range ms.targets {
				p, ok := pm.peerQueues[t]
				if !ok {
					log.Infof("tried sending wantlist change to non-partner peer: %s", t)
					continue
				}
				p.AddMessage(ms.entries, ms.from)
			}
		}
	case connect:
		p := message.params.(connectParams)
		pm.startPeerHandler(p.peer, p.initialEntries)
	case disconnect:
		disconnectPeer := message.params.(peer.ID)
		pm.stopPeerHandler(disconnectPeer)
	case getPeers:
		peers := make([]peer.ID, 0, len(pm.peerQueues))
		for p := range pm.peerQueues {
			peers = append(peers, p)
		}
		message.resultsChan <- peers
	}
}