Commit 440377e2 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

contextify peermanager

parent 16e05fc4
......@@ -316,8 +316,6 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
// TODO(brian): handle errors
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error {
//defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(p, incoming)
......
......@@ -206,7 +206,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p)
} else {
log.Debugf("wants %s", entry.Key, entry.Priority)
log.Debugf("wants %s - %d", entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p)
......
......@@ -53,24 +53,24 @@ type msgQueue struct {
done chan struct{}
}
func (pm *PeerManager) SendBlock(env *engine.Envelope) {
func (pm *PeerManager) SendBlock(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer env.Sent()
msg := bsmsg.New()
msg.AddBlock(env.Block)
err := pm.network.SendMessage(context.TODO(), env.Peer, msg)
err := pm.network.SendMessage(ctx, env.Peer, msg)
if err != nil {
log.Error(err)
}
}
func (pm *PeerManager) startPeerHandler(p peer.ID) {
func (pm *PeerManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue {
_, ok := pm.peers[p]
if ok {
// TODO: log an error?
return
return nil
}
mq := new(msgQueue)
......@@ -79,7 +79,8 @@ func (pm *PeerManager) startPeerHandler(p peer.ID) {
mq.p = p
pm.peers[p] = mq
go pm.runQueue(mq)
go pm.runQueue(ctx, mq)
return mq
}
func (pm *PeerManager) stopPeerHandler(p peer.ID) {
......@@ -93,14 +94,14 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) {
delete(pm.peers, p)
}
func (pm *PeerManager) runQueue(mq *msgQueue) {
func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) {
for {
select {
case <-mq.work: // there is work to be done
// TODO: this might not need to be done every time, figure out
// a good heuristic
err := pm.network.ConnectTo(context.TODO(), mq.p)
err := pm.network.ConnectTo(ctx, mq.p)
if err != nil {
log.Error(err)
// TODO: cant connect, what now?
......@@ -114,7 +115,7 @@ func (pm *PeerManager) runQueue(mq *msgQueue) {
if wlm != nil && !wlm.Empty() {
// send wantlist updates
err = pm.network.SendMessage(context.TODO(), mq.p, wlm)
err = pm.network.SendMessage(ctx, mq.p, wlm)
if err != nil {
log.Error("bitswap send error: ", err)
// TODO: what do we do if this fails?
......@@ -162,13 +163,12 @@ func (pm *PeerManager) Run(ctx context.Context) {
p, ok := pm.peers[msgp.to]
if !ok {
//TODO: decide, drop message? or dial?
pm.startPeerHandler(msgp.to)
p = pm.peers[msgp.to]
p = pm.startPeerHandler(ctx, msgp.to)
}
p.addMessage(msgp.msg)
case p := <-pm.connect:
pm.startPeerHandler(p)
pm.startPeerHandler(ctx, p)
case p := <-pm.disconnect:
pm.stopPeerHandler(p)
case <-ctx.Done():
......
......@@ -46,6 +46,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
bs.rebroadcastWorker(ctx)
})
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})
......@@ -71,8 +72,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
continue
}
//log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.pm.SendBlock(envelope)
bs.pm.SendBlock(ctx, envelope)
case <-ctx.Done():
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment