Commit de374226 authored by Brian Tiger Chow's avatar Brian Tiger Chow

fix(dht/routing) make GetProviders respect context

This commit makes GetProviders (sync) respect the request context. It
also amends all of GetProviders' callsites to pass a context in. This
meant changing the signature of the dht's handlerfunc.

I think I'll start referring to the request context as Vito Corleone.

cc @whyrusleeping @jbenet

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent f756088d
......@@ -161,7 +161,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
}
// dispatch handler.
rpmes, err := handler(mPeer, pmes)
rpmes, err := handler(ctx, mPeer, pmes)
if err != nil {
log.Errorf("handle message error: %s", err)
return nil
......
......@@ -5,20 +5,19 @@ import (
"fmt"
"time"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
)
// The number of closer peers to send on requests.
var CloserPeerCount = 4
// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(peer.Peer, *pb.Message) (*pb.Message, error)
type dhtHandler func(context.Context, peer.Peer, *pb.Message) (*pb.Message, error)
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
switch t {
......@@ -39,7 +38,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
}
}
func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
// setup response
......@@ -85,7 +84,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message,
}
// if we know any providers for the requested value, return those.
provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
provs := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey()))
if len(provs) > 0 {
log.Debugf("handleGetValue returning %d provider[s]", len(provs))
resp.ProviderPeers = pb.PeersToPBPeers(provs)
......@@ -107,7 +106,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message,
}
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
dht.dslock.Lock()
defer dht.dslock.Unlock()
dskey := u.Key(pmes.GetKey()).DsKey()
......@@ -129,12 +128,12 @@ func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message,
return pmes, err
}
func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
return pmes, nil
}
func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
var closest []peer.Peer
......@@ -164,7 +163,7 @@ func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message,
return resp, nil
}
func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// check if we have this value, to add ourselves as provider.
......@@ -177,7 +176,7 @@ func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Messa
}
// setup providers
providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
providers := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey()))
if has {
providers = append(providers, dht.self)
}
......@@ -201,7 +200,7 @@ type providerInfo struct {
Value peer.Peer
}
func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
key := u.Key(pmes.GetKey())
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))
......
......@@ -101,12 +101,16 @@ func (pm *ProviderManager) AddProvider(k u.Key, val peer.Peer) {
}
}
func (pm *ProviderManager) GetProviders(k u.Key) []peer.Peer {
func (pm *ProviderManager) GetProviders(ctx context.Context, k u.Key) []peer.Peer {
gp := new(getProv)
gp.k = k
gp.resp = make(chan []peer.Peer)
pm.getprovs <- gp
return <-gp.resp
select {
case pm.getprovs <- gp:
return <-gp.resp
case <-ctx.Done():
return nil
}
}
func (pm *ProviderManager) GetLocal() []u.Key {
......
......@@ -15,7 +15,7 @@ func TestProviderManager(t *testing.T) {
p := NewProviderManager(ctx, mid)
a := u.Key("test")
p.AddProvider(a, peer.WithIDString("testingprovider"))
resp := p.GetProviders(a)
resp := p.GetProviders(ctx, a)
if len(resp) != 1 {
t.Fatal("Could not retrieve provider.")
}
......
......@@ -133,7 +133,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
ps := newPeerSet()
// TODO may want to make this function async to hide latency
provs := dht.providers.GetProviders(key)
provs := dht.providers.GetProviders(ctx, key)
for _, p := range provs {
count--
// NOTE: assuming that this list of peers is unique
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment