From ef967ceeef50995877c0245ccbde1bd87a0fe9c9 Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Wed, 13 May 2015 16:35:08 -0700 Subject: [PATCH] contextify peermanager --- exchange/bitswap/bitswap.go | 2 -- exchange/bitswap/decision/engine.go | 2 +- exchange/bitswap/peermanager.go | 22 +++++++++++----------- exchange/bitswap/workers.go | 4 ++-- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index b8dcdab1e..a05ea8091 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -316,8 +316,6 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli // TODO(brian): handle errors func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error { - //defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() - // This call records changes to wantlists, blocks received, // and number of bytes transfered. bs.engine.MessageReceived(p, incoming) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 0b08a55fb..2644885d3 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -206,7 +206,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { - log.Debugf("wants %s", entry.Key, entry.Priority) + log.Debugf("wants %s - %d", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go index a91acd45b..a1ce7c7a8 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/peermanager.go @@ -53,24 +53,24 @@ type msgQueue struct { done chan struct{} } -func (pm *PeerManager) SendBlock(env *engine.Envelope) { +func (pm *PeerManager) SendBlock(ctx context.Context, env *engine.Envelope) { // Blocks need to be sent synchronously to maintain proper backpressure // throughout the network stack defer env.Sent() msg := bsmsg.New() msg.AddBlock(env.Block) - err := pm.network.SendMessage(context.TODO(), env.Peer, msg) + err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Error(err) } } -func (pm *PeerManager) startPeerHandler(p peer.ID) { +func (pm *PeerManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { _, ok := pm.peers[p] if ok { // TODO: log an error? - return + return nil } mq := new(msgQueue) @@ -79,7 +79,8 @@ func (pm *PeerManager) startPeerHandler(p peer.ID) { mq.p = p pm.peers[p] = mq - go pm.runQueue(mq) + go pm.runQueue(ctx, mq) + return mq } func (pm *PeerManager) stopPeerHandler(p peer.ID) { @@ -93,14 +94,14 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *PeerManager) runQueue(mq *msgQueue) { +func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) { for { select { case <-mq.work: // there is work to be done // TODO: this might not need to be done every time, figure out // a good heuristic - err := pm.network.ConnectTo(context.TODO(), mq.p) + err := pm.network.ConnectTo(ctx, mq.p) if err != nil { log.Error(err) // TODO: cant connect, what now? @@ -114,7 +115,7 @@ func (pm *PeerManager) runQueue(mq *msgQueue) { if wlm != nil && !wlm.Empty() { // send wantlist updates - err = pm.network.SendMessage(context.TODO(), mq.p, wlm) + err = pm.network.SendMessage(ctx, mq.p, wlm) if err != nil { log.Error("bitswap send error: ", err) // TODO: what do we do if this fails? @@ -162,13 +163,12 @@ func (pm *PeerManager) Run(ctx context.Context) { p, ok := pm.peers[msgp.to] if !ok { //TODO: decide, drop message? or dial? - pm.startPeerHandler(msgp.to) - p = pm.peers[msgp.to] + p = pm.startPeerHandler(ctx, msgp.to) } p.addMessage(msgp.msg) case p := <-pm.connect: - pm.startPeerHandler(p) + pm.startPeerHandler(ctx, p) case p := <-pm.disconnect: pm.stopPeerHandler(p) case <-ctx.Done(): diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index c6c2bbb25..ba9a77549 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -46,6 +46,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { bs.rebroadcastWorker(ctx) }) + // Start up a worker to manage sending out provides messages px.Go(func(px process.Process) { bs.provideCollector(ctx) }) @@ -71,8 +72,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { continue } - //log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) - bs.pm.SendBlock(envelope) + bs.pm.SendBlock(ctx, envelope) case <-ctx.Done(): return } -- GitLab