peermanager.go 4.09 KB
Newer Older

package peermanager

import (
	"context"

	bsmsg "github.com/ipfs/go-bitswap/message"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	logging "github.com/ipfs/go-log"

	peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

var (
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

type sendMessageParams struct {
	entries []*bsmsg.Entry
	targets []peer.ID
	from    uint64
}

type connectParams struct {
	peer           peer.ID
	initialEntries []*wantlist.Entry
}

type peerMessageType int

const (
	connect peerMessageType = iota + 1
	disconnect
	getPeers
	sendMessage
)

type peerMessage struct {
	messageType peerMessageType
	params      interface{}
	resultsChan chan interface{}
}

type PeerQueue interface {
	RefIncrement()
	RefDecrement() bool
	AddMessage(entries []*bsmsg.Entry, ses uint64)
	Startup(ctx context.Context, initialEntries []*wantlist.Entry)
	Shutdown()
}

type PeerQueueFactory func(p peer.ID) PeerQueue

type PeerManager struct {
	// sync channel for Run loop
	peerMessages chan peerMessage

	// synchronized by Run loop, only touch inside there
	peerQueues map[peer.ID]PeerQueue

	createPeerQueue PeerQueueFactory
	ctx             context.Context
	cancel          func()
}

func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
	ctx, cancel := context.WithCancel(ctx)
	return &PeerManager{
		peerMessages:    make(chan peerMessage, 10),
		peerQueues:      make(map[peer.ID]PeerQueue),
		createPeerQueue: createPeerQueue,
		ctx:             ctx,
		cancel:          cancel,
	}
}

func (pm *PeerManager) ConnectedPeers() []peer.ID {
	resp := make(chan interface{})
	pm.peerMessages <- peerMessage{getPeers, nil, resp}
	peers := <-resp
	return peers.([]peer.ID)
}

func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue {
	mq, ok := pm.peerQueues[p]
	if ok {
		mq.RefIncrement()
		return nil
	}

	mq = pm.createPeerQueue(p)
	pm.peerQueues[p] = mq
	mq.Startup(pm.ctx, initialEntries)
	return mq
}

func (pm *PeerManager) stopPeerHandler(p peer.ID) {
	pq, ok := pm.peerQueues[p]
	if !ok {
		// TODO: log error?
		return
	}

	if pq.RefDecrement() {
		return
	}

	pq.Shutdown()
	delete(pm.peerQueues, p)
}

func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
	select {
	case pm.peerMessages <- peerMessage{connect, connectParams{peer: p, initialEntries: initialEntries}, nil}:
	case <-pm.ctx.Done():
	}
}

func (pm *PeerManager) Disconnected(p peer.ID) {
	select {
	case pm.peerMessages <- peerMessage{disconnect, p, nil}:
	case <-pm.ctx.Done():
	}
}

func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
	select {
	case pm.peerMessages <- peerMessage{
		sendMessage,
		&sendMessageParams{entries: entries, targets: targets, from: from},
		nil,
	}:
	case <-pm.ctx.Done():
	}
}

func (pm *PeerManager) Startup() {
	go pm.run()
}

func (pm *PeerManager) Shutdown() {
	pm.cancel()
}

// TODO: use goprocess here once i trust it
func (pm *PeerManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	for {
		select {
		case message := <-pm.peerMessages:
			pm.handleMessage(message)
		case <-pm.ctx.Done():
			return
		}
	}
}

func (pm *PeerManager) handleMessage(message peerMessage) {

	switch message.messageType {
	case sendMessage:
		ms := message.params.(*sendMessageParams)
		if len(ms.targets) == 0 {
			for _, p := range pm.peerQueues {
				p.AddMessage(ms.entries, ms.from)
			}
		} else {
			for _, t := range ms.targets {
				p, ok := pm.peerQueues[t]
				if !ok {
					log.Infof("tried sending wantlist change to non-partner peer: %s", t)
					continue
				}
				p.AddMessage(ms.entries, ms.from)
			}
		}
	case connect:
		p := message.params.(connectParams)
		pm.startPeerHandler(p.peer, p.initialEntries)
	case disconnect:
		disconnectPeer := message.params.(peer.ID)
		pm.stopPeerHandler(disconnectPeer)
	case getPeers:
		peers := make([]peer.ID, 0, len(pm.peerQueues))
		for p := range pm.peerQueues {
			peers = append(peers, p)
		}
		message.resultsChan <- peers
	}
}