Commit c3aed70f authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

clarify synhronization constructs

parent 77e81da9
......@@ -14,23 +14,17 @@ import (
)
type WantManager struct {
receiver bsnet.Receiver
incoming chan []*bsmsg.Entry
// notification channel for new peers connecting
connect chan peer.ID
// notification channel for peers disconnecting
disconnect chan peer.ID
// sync channels for Run loop
incoming chan []*bsmsg.Entry
connect chan peer.ID // notification channel for new peers connecting
disconnect chan peer.ID // notification channel for peers disconnecting
// synchronized by Run loop, only touch inside there
peers map[peer.ID]*msgQueue
wl *wantlist.Wantlist
wl *wantlist.Wantlist
network bsnet.BitSwapNetwork
ctx context.Context
ctx context.Context
}
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
......@@ -58,8 +52,9 @@ type cancellation struct {
type msgQueue struct {
p peer.ID
outlk sync.Mutex
out bsmsg.BitSwapMessage
outlk sync.Mutex
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
work chan struct{}
done chan struct{}
......@@ -112,7 +107,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
return nil
}
mq := newMsgQueue(p)
mq := pm.newMsgQueue(p)
// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
......@@ -123,7 +118,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
mq.work <- struct{}{}
pm.peers[p] = mq
go pm.runQueue(mq)
go mq.runQueue(pm.ctx)
return mq
}
......@@ -138,12 +133,12 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) {
delete(pm.peers, p)
}
func (pm *WantManager) runQueue(mq *msgQueue) {
func (mq *msgQueue) runQueue(ctx context.Context) {
for {
select {
case <-mq.work: // there is work to be done
err := pm.network.ConnectTo(pm.ctx, mq.p)
err := mq.network.ConnectTo(ctx, mq.p)
if err != nil {
log.Errorf("cant connect to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
......@@ -161,7 +156,7 @@ func (pm *WantManager) runQueue(mq *msgQueue) {
mq.outlk.Unlock()
// send wantlist updates
err = pm.network.SendMessage(pm.ctx, mq.p, wlm)
err = mq.network.SendMessage(ctx, mq.p, wlm)
if err != nil {
log.Error("bitswap send error: ", err)
// TODO: what do we do if this fails?
......@@ -224,10 +219,11 @@ func (pm *WantManager) Run() {
}
}
func newMsgQueue(p peer.ID) *msgQueue {
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
mq := new(msgQueue)
mq.done = make(chan struct{})
mq.work = make(chan struct{}, 1)
mq.network = wm.network
mq.p = p
return mq
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment