From b71a0aced018909ca2d56797394e4ae3b02516b8 Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Tue, 19 May 2015 11:26:50 -0700 Subject: [PATCH] clarify synhronization constructs --- exchange/bitswap/wantmanager.go | 38 +++++++++++++++------------------ 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 74372f7f0..4efd120ef 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -14,23 +14,17 @@ import ( ) type WantManager struct { - receiver bsnet.Receiver - - incoming chan []*bsmsg.Entry - - // notification channel for new peers connecting - connect chan peer.ID - - // notification channel for peers disconnecting - disconnect chan peer.ID + // sync channels for Run loop + incoming chan []*bsmsg.Entry + connect chan peer.ID // notification channel for new peers connecting + disconnect chan peer.ID // notification channel for peers disconnecting + // synchronized by Run loop, only touch inside there peers map[peer.ID]*msgQueue - - wl *wantlist.Wantlist + wl *wantlist.Wantlist network bsnet.BitSwapNetwork - - ctx context.Context + ctx context.Context } func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { @@ -58,8 +52,9 @@ type cancellation struct { type msgQueue struct { p peer.ID - outlk sync.Mutex - out bsmsg.BitSwapMessage + outlk sync.Mutex + out bsmsg.BitSwapMessage + network bsnet.BitSwapNetwork work chan struct{} done chan struct{} @@ -112,7 +107,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { return nil } - mq := newMsgQueue(p) + mq := pm.newMsgQueue(p) // new peer, we will want to give them our full wantlist fullwantlist := bsmsg.New(true) @@ -123,7 +118,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { mq.work <- struct{}{} pm.peers[p] = mq - go pm.runQueue(mq) + go mq.runQueue(pm.ctx) return mq } @@ -138,12 +133,12 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *WantManager) runQueue(mq *msgQueue) { +func (mq *msgQueue) runQueue(ctx context.Context) { for { select { case <-mq.work: // there is work to be done - err := pm.network.ConnectTo(pm.ctx, mq.p) + err := mq.network.ConnectTo(ctx, mq.p) if err != nil { log.Errorf("cant connect to peer %s: %s", mq.p, err) // TODO: cant connect, what now? @@ -161,7 +156,7 @@ func (pm *WantManager) runQueue(mq *msgQueue) { mq.outlk.Unlock() // send wantlist updates - err = pm.network.SendMessage(pm.ctx, mq.p, wlm) + err = mq.network.SendMessage(ctx, mq.p, wlm) if err != nil { log.Error("bitswap send error: ", err) // TODO: what do we do if this fails? @@ -224,10 +219,11 @@ func (pm *WantManager) Run() { } } -func newMsgQueue(p peer.ID) *msgQueue { +func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { mq := new(msgQueue) mq.done = make(chan struct{}) mq.work = make(chan struct{}, 1) + mq.network = wm.network mq.p = p return mq -- GitLab