Commit 1de9b905 authored by Steven Allen's avatar Steven Allen

fix: improve context deadline handling

1. Continue to best-effort provide, but still return an error when we fail to
send provider records to the _best_ peers.
2. Continue returning the best peer's we've found in GetClosestPeers, but also
return an error to indicate that we didn't find the closest ones.

And fix the hang test.
parent 509c0bce
......@@ -21,7 +21,9 @@ import (
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
)
func TestHang(t *testing.T) {
// Test that one hung request to a peer doesn't prevent another request
// using that same peer from obeying its context.
func TestHungRequest(t *testing.T) {
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
......@@ -44,22 +46,27 @@ func TestHang(t *testing.T) {
ctx1, cancel1 := context.WithTimeout(ctx, 1*time.Second)
defer cancel1()
peers, err := d.GetClosestPeers(ctx1, testCaseCids[0].KeyString())
if err != nil {
t.Fatal(err)
}
done := make(chan error, 1)
go func() {
_, err := d.GetClosestPeers(ctx1, testCaseCids[0].KeyString())
done <- err
}()
time.Sleep(100 * time.Millisecond)
ctx2, cancel2 := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel2()
_ = d.Provide(ctx2, testCaseCids[0], true)
if ctx2.Err() != context.DeadlineExceeded {
err = d.Provide(ctx2, testCaseCids[0], true)
if err != context.DeadlineExceeded {
t.Errorf("expected to fail with deadline exceeded, got: %s", ctx2.Err())
}
select {
case <-peers:
t.Error("GetClosestPeers should not have returned yet")
case <-done:
t.Errorf("GetClosestPeers should not have returned yet")
default:
err = <-done
if err != context.DeadlineExceeded {
t.Errorf("expected the deadline to be exceeded, got %s", err)
}
}
}
......
......@@ -68,6 +68,9 @@ func multihashLoggableKey(mh multihash.Multihash) logging.LoggableMap {
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
//
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
//TODO: I can break the interface! return []peer.ID
e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key))
......@@ -121,5 +124,5 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
}
return out, nil
return out, ctx.Err()
}
......@@ -461,7 +461,20 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
defer cancel()
}
var exceededDeadline bool
peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
switch err {
case context.DeadlineExceeded:
// If the _inner_ deadline has been exceeded but the _outer_
// context is still fine, provide the value to the closest peers
// we managed to find, even if they're not the _actual_ closest peers.
if ctx.Err() != nil {
return ctx.Err()
}
exceededDeadline = true
case nil:
default:
}
if err != nil {
return err
}
......@@ -484,7 +497,10 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
}(p)
}
wg.Wait()
return nil
if exceededDeadline {
return context.DeadlineExceeded
}
return ctx.Err()
}
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
pi := peer.AddrInfo{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment