Unverified Commit 401b87dd authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #78 from ipfs/fix/lockless

make the WantlistManager own the PeerHandler
parents bb948444 3a24fa2c
......@@ -93,7 +93,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return bsmq.New(ctx, p, network)
}
wm := bswm.New(ctx)
wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
pqm := bspqm.New(ctx, network)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
......@@ -115,7 +115,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
......@@ -123,7 +122,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sentHistogram: sentHistogram,
}
bs.wm.SetDelegate(bs.pm)
bs.wm.Startup()
bs.pqm.Startup()
network.SetDelegate(bs)
......@@ -144,10 +142,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
pm *bspm.PeerManager
// the wantlist tracks global wants for bitswap
wm *bswm.WantManager
......
......@@ -199,19 +199,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.Log("Give the blocks to the first instance")
nump := len(instances) - 1
// assert we're properly connected
for _, inst := range instances {
peers := inst.Exchange.pm.ConnectedPeers()
for i := 0; i < 10 && len(peers) != nump; i++ {
time.Sleep(time.Millisecond * 50)
peers = inst.Exchange.pm.ConnectedPeers()
}
if len(peers) != nump {
t.Fatal("not enough peers connected to instance")
}
}
var blkeys []cid.Cid
first := instances[0]
for _, b := range blocks {
......
......@@ -2,7 +2,6 @@ package peermanager
import (
"context"
"sync"
bsmsg "github.com/ipfs/go-bitswap/message"
wantlist "github.com/ipfs/go-bitswap/wantlist"
......@@ -40,8 +39,7 @@ type peerQueueInstance struct {
// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
// peerQueues -- interact through internal utility functions get/set/remove/iterate
peerQueues map[peer.ID]*peerQueueInstance
peerQueuesLk sync.RWMutex
peerQueues map[peer.ID]*peerQueueInstance
createPeerQueue PeerQueueFactory
ctx context.Context
......@@ -58,8 +56,6 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
pm.peerQueuesLk.RLock()
defer pm.peerQueuesLk.RUnlock()
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
peers = append(peers, p)
......@@ -70,8 +66,6 @@ func (pm *PeerManager) ConnectedPeers() []peer.ID {
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialWants *wantlist.SessionTrackedWantlist) {
pm.peerQueuesLk.Lock()
pq := pm.getOrCreate(p)
if pq.refcnt == 0 {
......@@ -79,47 +73,35 @@ func (pm *PeerManager) Connected(p peer.ID, initialWants *wantlist.SessionTracke
}
pq.refcnt++
pm.peerQueuesLk.Unlock()
}
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.peerQueuesLk.Lock()
pq, ok := pm.peerQueues[p]
if !ok {
pm.peerQueuesLk.Unlock()
return
}
pq.refcnt--
if pq.refcnt > 0 {
pm.peerQueuesLk.Unlock()
return
}
delete(pm.peerQueues, p)
pm.peerQueuesLk.Unlock()
pq.pq.Shutdown()
}
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
func (pm *PeerManager) SendMessage(entries []bsmsg.Entry, targets []peer.ID, from uint64) {
if len(targets) == 0 {
pm.peerQueuesLk.RLock()
for _, p := range pm.peerQueues {
p.pq.AddMessage(entries, from)
}
pm.peerQueuesLk.RUnlock()
} else {
for _, t := range targets {
pm.peerQueuesLk.Lock()
pqi := pm.getOrCreate(t)
pm.peerQueuesLk.Unlock()
pqi.pq.AddMessage(entries, from)
}
}
......
......@@ -53,7 +53,7 @@ type WantManager struct {
}
// New initializes a new WantManager for a given context.
func New(ctx context.Context) *WantManager {
func New(ctx context.Context, peerHandler PeerHandler) *WantManager {
ctx, cancel := context.WithCancel(ctx)
wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
"Number of items in wantlist.").Gauge()
......@@ -63,15 +63,11 @@ func New(ctx context.Context) *WantManager {
bcwl: wantlist.NewSessionTrackedWantlist(),
ctx: ctx,
cancel: cancel,
peerHandler: peerHandler,
wantlistGauge: wantlistGauge,
}
}
// SetDelegate specifies who will send want changes out to the internet.
func (wm *WantManager) SetDelegate(peerHandler PeerHandler) {
wm.peerHandler = peerHandler
}
// WantBlocks adds the given cids to the wantlist, tracked by the given session.
func (wm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) {
log.Infof("want blocks: %s", ks)
......
......@@ -40,7 +40,7 @@ func setupTestFixturesAndInitialWantList() (
// setup fixtures
wantSender := &fakePeerHandler{}
wantManager := New(ctx)
wantManager := New(ctx, wantSender)
keys := testutil.GenerateCids(10)
otherKeys := testutil.GenerateCids(5)
peers := testutil.GeneratePeers(10)
......@@ -48,7 +48,6 @@ func setupTestFixturesAndInitialWantList() (
otherSession := testutil.GenerateSessionID()
// startup wantManager
wantManager.SetDelegate(wantSender)
wantManager.Startup()
// add initial wants
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment