peermanager.go 2.59 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
package peermanager

import (
	"context"
	"sync"
	"time"

	gsmsg "github.com/ipfs/go-graphsync/message"
	logging "github.com/ipfs/go-log"

	peer "github.com/libp2p/go-libp2p-peer"
)

const (
	defaultCleanupInterval = time.Minute
)

var log = logging.Logger("graphsync")

var (
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

// PeerQueue provides a queer of messages to be sent for a single peer.
type PeerQueue interface {
26
	AddRequest(graphSyncRequest gsmsg.GraphSyncRequest)
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
	Startup()
	Shutdown()
}

// PeerQueueFactory provides a function that will create a PeerQueue.
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue

type peerQueueInstance struct {
	refcnt int
	pq     PeerQueue
}

// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
	peerQueues   map[peer.ID]*peerQueueInstance
	peerQueuesLk sync.RWMutex

	createPeerQueue PeerQueueFactory
	ctx             context.Context
}

// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
	return &PeerManager{
		peerQueues:      make(map[peer.ID]*peerQueueInstance),
		createPeerQueue: createPeerQueue,
		ctx:             ctx,
	}
}

// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
	pm.peerQueuesLk.RLock()
	defer pm.peerQueuesLk.RUnlock()
	peers := make([]peer.ID, 0, len(pm.peerQueues))
	for p := range pm.peerQueues {
		peers = append(peers, p)
	}
	return peers
}

// Connected is called to add a new peer to the pool
func (pm *PeerManager) Connected(p peer.ID) {
	pm.peerQueuesLk.Lock()
	pq := pm.getOrCreate(p)
	pq.refcnt++
	pm.peerQueuesLk.Unlock()
}

// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
	pm.peerQueuesLk.Lock()
	pq, ok := pm.peerQueues[p]
	if !ok {
		pm.peerQueuesLk.Unlock()
		return
	}

	pq.refcnt--
	if pq.refcnt > 0 {
		pm.peerQueuesLk.Unlock()
		return
	}

	delete(pm.peerQueues, p)
	pm.peerQueuesLk.Unlock()

	pq.pq.Shutdown()

}

// SendRequest sends the given request to the given peer.
func (pm *PeerManager) SendRequest(
	p peer.ID,
101
	graphSyncRequest gsmsg.GraphSyncRequest) {
102 103 104
	pm.peerQueuesLk.Lock()
	pqi := pm.getOrCreate(p)
	pm.peerQueuesLk.Unlock()
105
	pqi.pq.AddRequest(graphSyncRequest)
106 107 108 109 110 111 112 113 114 115 116 117
}

func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance {
	pqi, ok := pm.peerQueues[p]
	if !ok {
		pq := pm.createPeerQueue(pm.ctx, p)
		pq.Startup()
		pqi = &peerQueueInstance{0, pq}
		pm.peerQueues[p] = pqi
	}
	return pqi
}