Commit bdd00c9d authored by Jeromy Johnson's avatar Jeromy Johnson

Merge pull request #986 from ipfs/feat/dht-bw-usage

reduce dht bandwidth consumption
parents 0f9082aa ad432fe8
......@@ -226,5 +226,5 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
dht.providers.AddProvider(ctx, key, p)
}
return pmes, nil // send back same msg as confirmation.
return nil, nil
}
......@@ -31,6 +31,10 @@ func (dht *IpfsDHT) getPublicKeyOnline(ctx context.Context, p peer.ID) (ci.PubKe
ctxT, cancelFunc := ctxutil.WithDeadlineFraction(ctx, 0.3)
defer cancelFunc()
if pk, err := dht.getPublicKeyFromNode(ctx, p); err == nil {
err := dht.peerstore.AddPubKey(p, pk)
if err != nil {
return pk, err
}
return pk, nil
}
......@@ -38,7 +42,7 @@ func (dht *IpfsDHT) getPublicKeyOnline(ctx context.Context, p peer.ID) (ci.PubKe
log.Debugf("pk for %s not in peerstore, and peer failed. trying dht.", p)
pkkey := KeyForPublicKey(p)
// ok, try the node itself. if they're overwhelmed or slow we can move on.
// ok, now try the dht. Anyone who has previously fetched the key should have it
val, err := dht.GetValue(ctxT, pkkey)
if err != nil {
log.Warning("Failed to find requested public key.")
......@@ -50,7 +54,8 @@ func (dht *IpfsDHT) getPublicKeyOnline(ctx context.Context, p peer.ID) (ci.PubKe
log.Debugf("Failed to unmarshal public key: %s", err)
return nil, err
}
return pk, nil
return pk, dht.peerstore.AddPubKey(p, pk)
}
func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.PubKey, error) {
......
......@@ -91,7 +91,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
}
// get closest peers in the routing table
rtp := dht.routingTable.ListPeers()
rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
log.Debugf("peers in rt: %s", len(rtp), rtp)
if len(rtp) == 0 {
log.Warning("No peers from routing table!")
......@@ -256,7 +256,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
return &dhtQueryResult{closerPeers: clpeers}, nil
})
peers := dht.routingTable.ListPeers()
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
_, err := query.Run(ctx, peers)
if err != nil {
log.Debugf("Query error: %s", err)
......@@ -276,7 +276,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er
return pi, nil
}
peers := dht.routingTable.ListPeers()
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return peer.PeerInfo{}, errors.Wrap(kb.ErrLookupFailure)
}
......@@ -342,7 +342,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
peerchan := make(chan peer.PeerInfo, asyncQueryBuffer)
peersSeen := peer.Set{}
peers := dht.routingTable.ListPeers()
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return nil, errors.Wrap(kb.ErrLookupFailure)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment