Commit bff7f730 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #878 from jbenet/dontIgnoreCancelFuncs

context: Always call returned cancel funcs
parents 276f471c 1d5b9036
......@@ -7,10 +7,8 @@ import (
"errors"
"io"
"strings"
"time"
b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
cmds "github.com/jbenet/go-ipfs/commands"
core "github.com/jbenet/go-ipfs/core"
......@@ -81,14 +79,13 @@ ipfs id supports the format option for output with the following keys:
return
}
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
// TODO handle offline mode with polymorphism instead of conditionals
if !node.OnlineMode() {
res.SetError(errors.New(offlineIdErrorMessage), cmds.ErrClient)
return
}
p, err := node.Routing.FindPeer(ctx, id)
p, err := node.Routing.FindPeer(req.Context().Context, id)
if err == kb.ErrLookupFailure {
res.SetError(errors.New(offlineIdErrorMessage), cmds.ErrClient)
return
......
......@@ -126,7 +126,8 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int)
Text: fmt.Sprintf("Looking up peer %s", pid.Pretty()),
}
ctx, _ := context.WithTimeout(ctx, kPingTimeout)
ctx, cancel := context.WithTimeout(ctx, kPingTimeout)
defer cancel()
p, err := n.Routing.FindPeer(ctx, pid)
if err != nil {
outChan <- &PingResult{Text: fmt.Sprintf("Peer lookup error: %s", err)}
......@@ -147,7 +148,8 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int)
default:
}
ctx, _ := context.WithTimeout(ctx, kPingTimeout)
ctx, cancel := context.WithTimeout(ctx, kPingTimeout)
defer cancel()
took, err := n.Routing.Ping(ctx, pid)
if err != nil {
log.Debugf("Ping error: %s", err)
......
......@@ -54,7 +54,8 @@ func Listen(nd *core.IpfsNode, protocol string) (*ipfsListener, error) {
}
func Dial(nd *core.IpfsNode, p peer.ID, protocol string) (net.Stream, error) {
ctx, _ := context.WithTimeout(nd.Context(), time.Second*30)
ctx, cancel := context.WithTimeout(nd.Context(), time.Second*30)
defer cancel()
err := nd.PeerHost.Connect(ctx, peer.PeerInfo{ID: p})
if err != nil {
return nil, err
......
......@@ -14,12 +14,12 @@ import (
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ctxutil "github.com/jbenet/go-ipfs/util/ctx"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
host "github.com/jbenet/go-ipfs/p2p/host"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
ctxutil "github.com/jbenet/go-ipfs/util/ctx"
pb "github.com/jbenet/go-ipfs/diagnostics/internal/pb"
util "github.com/jbenet/go-ipfs/util"
......@@ -138,7 +138,8 @@ func newID() string {
// GetDiagnostic runs a diagnostics request across the entire network
func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) {
log.Debug("Getting diagnostic.")
ctx, _ := context.WithTimeout(context.TODO(), timeout)
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
diagID := newID()
d.diagLock.Lock()
......
......@@ -269,7 +269,8 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
go func(k u.Key) {
defer wg.Done()
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
child, cancel := context.WithTimeout(ctx, providerRequestTimeout)
defer cancel()
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers {
sendToPeers <- prov
......@@ -311,10 +312,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
// Should only track *useful* messages in ledger
for _, block := range incoming.Blocks() {
hasBlockCtx, _ := context.WithTimeout(ctx, hasBlockTimeout)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil {
log.Debug(err)
}
cancel()
}
var keys []u.Key
......
......@@ -68,11 +68,12 @@ func (bs *Bitswap) provideWorker(ctx context.Context) {
log.Debug("provideKeys channel closed")
return
}
ctx, _ := context.WithTimeout(ctx, provideTimeout)
ctx, cancel := context.WithTimeout(ctx, provideTimeout)
err := bs.network.Provide(ctx, k)
if err != nil {
log.Error(err)
}
cancel()
case <-ctx.Done():
return
}
......@@ -136,12 +137,13 @@ func (bs *Bitswap) clientWorker(parent context.Context) {
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(req.ctx, providerRequestTimeout)
child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(req.ctx, providers)
if err != nil {
log.Debugf("error sending wantlist: %s", err)
}
cancel()
// Wait for wantNewBlocks to finish
<-done
......
......@@ -88,7 +88,8 @@ func (n *dagService) Get(k u.Key) (*Node, error) {
return nil, fmt.Errorf("dagService is nil")
}
ctx, _ := context.WithTimeout(context.TODO(), time.Minute)
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
// we shouldn't use an arbitrary timeout here.
// since Get doesnt take in a context yet, we give a large upper bound.
// think of an http request. we want it to go on as long as the client requests it.
......
......@@ -60,9 +60,11 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value u.Key)
nameb := u.Hash(pkbytes)
namekey := u.Key("/pk/" + string(nameb))
timectx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*10))
defer cancel()
log.Debugf("Storing pubkey at: %s", namekey)
// Store associated public key
timectx, _ := context.WithDeadline(ctx, time.Now().Add(time.Second*10))
err = p.routing.PutValue(timectx, namekey, pkbytes)
if err != nil {
return err
......
......@@ -227,8 +227,9 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error)
// if it succeeds, dial will add the conn to the swarm itself.
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, _ := context.WithTimeout(ctx, s.dialT)
ctxT, cancel := context.WithTimeout(ctx, s.dialT)
conn, err := s.dial(ctxT, p)
cancel()
s.dsync.Unlock(p)
log.Debugf("dial end %s", conn)
if err != nil {
......
......@@ -172,7 +172,9 @@ func (p *pinner) pinIndirectRecurse(node *mdag.Node) error {
}
func (p *pinner) pinLinks(node *mdag.Node) error {
ctx, _ := context.WithTimeout(context.Background(), time.Second*60)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
for _, ng := range p.dserv.GetDAG(ctx, node) {
subnode, err := ng.Get()
if err != nil {
......
......@@ -357,11 +357,12 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
rand.Read(id)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5)
for _, p := range peers {
ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*5)
_, err := dht.Ping(ctx, p)
if err != nil {
log.Debugf("Ping error: %s", err)
}
cancel()
}
case <-dht.Closing():
return
......
......@@ -28,7 +28,8 @@ func (dht *IpfsDHT) getPublicKeyOnline(ctx context.Context, p peer.ID) (ci.PubKe
}
// ok, try the node itself. if they're overwhelmed or slow we can move on.
ctxT, _ := ctxutil.WithDeadlineFraction(ctx, 0.3)
ctxT, cancelFunc := ctxutil.WithDeadlineFraction(ctx, 0.3)
defer cancelFunc()
if pk, err := dht.getPublicKeyFromNode(ctx, p); err == nil {
return pk, nil
}
......
......@@ -86,7 +86,8 @@ func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
}
r.flush()
ctx, _ := context.WithTimeout(context.TODO(), time.Second*60)
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*60)
defer cancel()
for i, ng := range r.dag.GetDAG(ctx, dagnode) {
childNode, err := ng.Get()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment