Unverified Commit 07ec9e84 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #111 from ipfs/feat/improve-connmgr

connmgr: give peers more weight when actively participating in a session
parents 401b87dd 8d74ae26
......@@ -38,16 +38,8 @@ var log = logging.Logger("bitswap")
var _ exchange.SessionExchange = (*Bitswap)(nil)
const (
// maxProvidersPerRequest specifies the maximum number of providers desired
// from the network. This value is specified because the network streams
// results.
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
maxProvidersPerRequest = 3
findProviderDelay = 1 * time.Second
providerRequestTimeout = time.Second * 10
// these requests take at _least_ two minutes at the moment.
provideTimeout = time.Minute * 3
sizeBatchRequestChan = 32
provideTimeout = time.Minute * 3
)
var (
......@@ -190,11 +182,6 @@ type counters struct {
messagesRecvd uint64
}
type blockRequest struct {
Cid cid.Cid
Ctx context.Context
}
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
......
......@@ -221,7 +221,7 @@ func (e *Engine) Peers() []peer.ID {
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
if m.Empty() {
log.Debugf("received empty message from %s", p)
}
......@@ -276,7 +276,6 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
log.Debugf("got block %s %d bytes", block, len(block.RawData()))
l.ReceivedBytes(len(block.RawData()))
}
return nil
}
func (e *Engine) addBlock(block blocks.Block) {
......@@ -309,7 +308,7 @@ func (e *Engine) AddBlock(block blocks.Block) {
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
l := e.findOrCreate(p)
l.lk.Lock()
defer l.lk.Unlock()
......@@ -320,7 +319,6 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
e.peerRequestQueue.Remove(block.Cid(), p)
}
return nil
}
func (e *Engine) PeerConnected(p peer.ID) {
......
......@@ -5,17 +5,10 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message"
wantlist "github.com/ipfs/go-bitswap/wantlist"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)
var log = logging.Logger("bitswap")
var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)
// PeerQueue provides a queer of messages to be sent for a single peer.
type PeerQueue interface {
AddMessage(entries []bsmsg.Entry, ses uint64)
......@@ -27,10 +20,6 @@ type PeerQueue interface {
// PeerQueueFactory provides a function that will create a PeerQueue.
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue
type peerMessage interface {
handle(pm *PeerManager)
}
type peerQueueInstance struct {
refcnt int
pq PeerQueue
......
......@@ -5,17 +5,15 @@ import (
"fmt"
"math/rand"
logging "github.com/ipfs/go-log"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)
var log = logging.Logger("bitswap")
const (
maxOptimizedPeers = 32
reservePeers = 2
maxOptimizedPeers = 32
reservePeers = 2
unoptimizedTagValue = 5 // tag value for "unoptimized" session peers.
optimizedTagValue = 10 // tag value for "optimized" session peers.
)
// PeerTagger is an interface for tagging peers with metadata
......@@ -131,8 +129,8 @@ func (spm *SessionPeerManager) run(ctx context.Context) {
}
}
func (spm *SessionPeerManager) tagPeer(p peer.ID) {
spm.tagger.TagPeer(p, spm.tag, 10)
func (spm *SessionPeerManager) tagPeer(p peer.ID, value int) {
spm.tagger.TagPeer(p, spm.tag, value)
}
func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
......@@ -173,7 +171,7 @@ func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = false
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
spm.tagPeer(p)
spm.tagPeer(p, unoptimizedTagValue)
}
}
......@@ -182,17 +180,16 @@ type peerResponseMessage struct {
}
func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
p := prm.p
isOptimized, ok := spm.activePeers[p]
if !ok {
spm.activePeers[p] = true
spm.tagPeer(p)
if isOptimized {
spm.removeOptimizedPeer(p)
} else {
if isOptimized {
spm.removeOptimizedPeer(p)
} else {
spm.activePeers[p] = true
spm.activePeers[p] = true
spm.tagPeer(p, optimizedTagValue)
// transition from unoptimized.
if ok {
spm.removeUnoptimizedPeer(p)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment