Commit 00b46e0c authored by Jeromy's avatar Jeromy

Fixes for dht findpeer queries

First, we use Alpha instead of K as the number of peers we grab from the
routing table (as per the kademlia paper).

Second, we don't use a size limited set for the 'GetClosestPeers' query.
We're going to process more than K peers before we find the K closest
peers.

Third, Change GetClosestPeers to actually return the K Closest peers,
not a hodge podge of peers that it found on the way to finding the
closest peers.
parent eafc461a
......@@ -4,12 +4,16 @@ os:
language: go
env:
- IPFS_REUSEPORT=false
go:
- 1.7
install: true
script:
- ulimit -n 2048
- make deps
- go vet
- go test ./...
......
......@@ -145,7 +145,7 @@ var errInvalidRecord = errors.New("received invalid record")
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []pstore.PeerInfo, error) {
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) {
pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
......@@ -320,12 +320,12 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
for _, clp := range closer {
// == to self? thats bad
if p == dht.self {
if clp == dht.self {
log.Warning("attempted to return self! this shouldn't happen...")
return nil
}
// Dont send a peer back themselves
if p == clp {
if clp == p {
continue
}
......
......@@ -679,7 +679,7 @@ func TestFindPeersConnectedToPeer(t *testing.T) {
}
// shouldFind := []peer.ID{peers[1], peers[3]}
var found []pstore.PeerInfo
var found []*pstore.PeerInfo
for nextp := range pchan {
found = append(found, nextp)
}
......@@ -831,7 +831,7 @@ func TestFindPeerQuery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nDHTs := 121
nDHTs := 101
_, allpeers, dhts := setupDHTS(ctx, nDHTs, t)
defer func() {
for i := 0; i < nDHTs; i++ {
......@@ -840,11 +840,13 @@ func TestFindPeerQuery(t *testing.T) {
}
}()
mrand := rand.New(rand.NewSource(42))
guy := dhts[0]
others := dhts[1:]
for i := 0; i < 20; i++ {
for j := 0; j < 10; j++ {
connect(t, ctx, others[i], others[20+(i*4)+j])
for j := 0; j < 16; j++ { // 16, high enough to probably not have any partitions
v := mrand.Intn(80)
connect(t, ctx, others[i], others[20+v])
}
}
......@@ -855,9 +857,9 @@ func TestFindPeerQuery(t *testing.T) {
val := "foobar"
rtval := kb.ConvertKey(val)
rtablePeers := guy.routingTable.NearestPeers(rtval, KValue)
if len(rtablePeers) != 20 {
t.Fatalf("expected 20 peers back from routing table, got %d", len(rtablePeers))
rtablePeers := guy.routingTable.NearestPeers(rtval, AlphaValue)
if len(rtablePeers) != 3 {
t.Fatalf("expected 3 peers back from routing table, got %d", len(rtablePeers))
}
netpeers := guy.host.Network().Peers()
......@@ -890,15 +892,36 @@ func TestFindPeerQuery(t *testing.T) {
t.Fatal("got entirely peers from our routing table")
}
if count != 20 {
t.Fatal("should have only gotten 20 peers from getclosestpeers call")
}
sort.Sort(peer.IDSlice(allpeers[1:]))
sort.Sort(peer.IDSlice(outpeers))
fmt.Println("counts: ", count, notfromrtable)
actualclosest := kb.SortClosestPeers(allpeers[1:], rtval)
exp := actualclosest[:20]
got := kb.SortClosestPeers(outpeers, rtval)
for i, v := range exp {
if v != got[i] {
t.Fatal("mismatch in expected closest peers:", exp, got)
diffp := countDiffPeers(exp, got)
if diffp > 0 {
// could be a partition created during setup
t.Fatal("didnt get expected closest peers")
}
}
func countDiffPeers(a, b []peer.ID) int {
s := make(map[peer.ID]bool)
for _, p := range a {
s[p] = true
}
var out int
for _, p := range b {
if !s[p] {
out++
}
}
return out
}
func TestFindClosestPeers(t *testing.T) {
......
......@@ -6,7 +6,6 @@ import (
logging "github.com/ipfs/go-log"
kb "github.com/libp2p/go-libp2p-kbucket"
peer "github.com/libp2p/go-libp2p-peer"
pset "github.com/libp2p/go-libp2p-peer/peerset"
pstore "github.com/libp2p/go-libp2p-peerstore"
notif "github.com/libp2p/go-libp2p-routing/notifications"
)
......@@ -21,6 +20,14 @@ func pointerizePeerInfos(pis []pstore.PeerInfo) []*pstore.PeerInfo {
return out
}
func toPeerInfos(ps []peer.ID) []*pstore.PeerInfo {
out := make([]*pstore.PeerInfo, len(ps))
for i, p := range ps {
out[i] = &pstore.PeerInfo{ID: p}
}
return out
}
func loggableKey(k string) logging.LoggableMap {
return logging.LoggableMap{
"key": k,
......@@ -37,16 +44,6 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
}
out := make(chan peer.ID, KValue)
peerset := pset.NewLimited(KValue)
for _, p := range tablepeers {
select {
case out <- p:
case <-ctx.Done():
return nil, ctx.Err()
}
peerset.Add(p)
}
// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
......@@ -64,36 +61,37 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
return nil, err
}
var filtered []pstore.PeerInfo
for _, clp := range closer {
if peerset.TryAdd(clp) {
select {
case out <- clp:
case <-ctx.Done():
return nil, ctx.Err()
}
filtered = append(filtered, dht.peerstore.PeerInfo(clp))
}
}
peerinfos := toPeerInfos(closer)
// For DHT query command
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.PeerResponse,
ID: p,
Responses: pointerizePeerInfos(filtered),
Responses: peerinfos, // todo: remove need for this pointerize thing
})
return &dhtQueryResult{closerPeers: filtered}, nil
return &dhtQueryResult{closerPeers: peerinfos}, nil
})
go func() {
defer close(out)
defer e.Done()
// run it!
_, err := query.Run(ctx, tablepeers)
res, err := query.Run(ctx, tablepeers)
if err != nil {
log.Debugf("closestPeers query run error: %s", err)
}
if res != nil && res.finalSet != nil {
sorted := kb.SortClosestPeers(res.finalSet.Peers(), kb.ConvertKey(key))
if len(sorted) > KValue {
sorted = sorted[:KValue]
}
for _, p := range sorted {
out <- p
}
}
}()
return out, nil
......
......@@ -14,15 +14,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmNiCwBNA8MWDADTFVq1BonUEJbS2SvjAoNkZZrhEwcuUi",
"hash": "QmPGxZ1DP2w45WcogpW1h43BvseXbfke9N91qotpoQcUeS",
"name": "go-libp2p-crypto",
"version": "1.3.1"
"version": "1.4.0"
},
{
"author": "whyrusleeping",
"hash": "QmZcUPvPhD1Xvk6mwijYF8AfR3mG31S1YsEfHG4khrFPRr",
"hash": "QmWUswjn261LSyVxWAEpMVtPdy8zmKBJJfBpG3Qdpa8ZsE",
"name": "go-libp2p-peer",
"version": "2.1.0"
"version": "2.1.5"
},
{
"hash": "QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP",
......@@ -60,9 +60,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmQMQ2RUjnaEEX8ybmrhuFFGhAwPjyL1Eo6ZoJGD7aAccM",
"hash": "Qme1g4e3m2SmdiSGGU3vSWmUStwUjc5oECnEriaK9Xa1HU",
"name": "go-libp2p-peerstore",
"version": "1.4.0"
"version": "1.4.4"
},
{
"author": "whyrusleeping",
......@@ -84,27 +84,27 @@
},
{
"author": "whyrusleeping",
"hash": "QmYTzt6uVtDmB5U3iYiA165DQ39xaNLjr8uuDhDtDByXYp",
"hash": "QmSwXLW21S3TsFHsNELc4a4Y7Kp8wToqnBBXWYvggBVLQY",
"name": "go-testutil",
"version": "1.1.1"
"version": "1.1.6"
},
{
"author": "whyrusleeping",
"hash": "QmZp9q8DbrGLztoxpkTC62mnRayRwHcAzGJJ8AvYRwjanR",
"hash": "QmcTnycWsBgvNYFYgWdWi8SRDCeevG8HBUQHkvg4KLXUsW",
"name": "go-libp2p-record",
"version": "2.1.0"
"version": "2.1.3"
},
{
"author": "whyrusleeping",
"hash": "QmUwZcbSVMsLZzovZssH96rCUM5FAkrjaqhHLhJnFYd5z3",
"hash": "QmTxn7JEA8DiBvd9vVzErAzadHn6TwjCKTjjUfPyRH9wjZ",
"name": "go-libp2p-kbucket",
"version": "2.1.2"
"version": "2.1.6"
},
{
"author": "whyrusleeping",
"hash": "QmZghcVHwXQC3Zvnvn24LgTmSPkEn2o3PDyKb6nrtPRzRh",
"hash": "QmUc6twRJRE9MNrUGd8eo9WjHHxebGppdZfptGCASkR7fF",
"name": "go-libp2p-routing",
"version": "2.2.7"
"version": "2.2.11"
},
{
"author": "whyrusleeping",
......@@ -120,21 +120,21 @@
},
{
"author": "whyrusleeping",
"hash": "QmTcfnDHimxBJqx6utpnWqVHdvyquXgkwAvYt4zMaJMKS2",
"hash": "QmXs1igHHEaUmMxKtbP8Z9wTjitQ75sqxaKQP4QgnLN4nn",
"name": "go-libp2p-loggables",
"version": "1.1.1"
"version": "1.1.5"
},
{
"author": "whyrusleeping",
"hash": "QmbzbRyd22gcW92U1rA2yKagB3myMYhk45XBknJ49F9XWJ",
"hash": "QmXzeAcmKDTfNZQBiyF22hQKuTK7P5z6MBBQLTk9bbiSUc",
"name": "go-libp2p-host",
"version": "1.3.7"
"version": "1.3.12"
},
{
"author": "whyrusleeping",
"hash": "QmU3g3psEDiC4tQh1Qu2NYg5aYVQqxC3m74ZavLwPfJEtu",
"hash": "QmeWJwi61vii5g8zQUB9UGegfUbmhTKHgeDFP9XuSp5jZ4",
"name": "go-libp2p",
"version": "4.3.11"
"version": "4.3.10"
}
],
"gxVersion": "0.4.0",
......
......@@ -53,8 +53,8 @@ func peerInfoToPBPeer(p pstore.PeerInfo) *Message_Peer {
}
// PBPeerToPeer turns a *Message_Peer into its pstore.PeerInfo counterpart
func PBPeerToPeerInfo(pbp *Message_Peer) pstore.PeerInfo {
return pstore.PeerInfo{
func PBPeerToPeerInfo(pbp *Message_Peer) *pstore.PeerInfo {
return &pstore.PeerInfo{
ID: peer.ID(pbp.GetId()),
Addrs: pbp.Addresses(),
}
......@@ -93,8 +93,8 @@ func PeerRoutingInfosToPBPeers(peers []PeerRoutingInfo) []*Message_Peer {
// PBPeersToPeerInfos converts given []*Message_Peer into []pstore.PeerInfo
// Invalid addresses will be silently omitted.
func PBPeersToPeerInfos(pbps []*Message_Peer) []pstore.PeerInfo {
peers := make([]pstore.PeerInfo, 0, len(pbps))
func PBPeersToPeerInfos(pbps []*Message_Peer) []*pstore.PeerInfo {
peers := make([]*pstore.PeerInfo, 0, len(pbps))
for _, pbp := range pbps {
peers = append(peers, PBPeerToPeerInfo(pbp))
}
......
......@@ -27,11 +27,13 @@ type dhtQuery struct {
}
type dhtQueryResult struct {
value []byte // GetValue
peer pstore.PeerInfo // FindPeer
providerPeers []pstore.PeerInfo // GetProviders
closerPeers []pstore.PeerInfo // *
value []byte // GetValue
peer *pstore.PeerInfo // FindPeer
providerPeers []pstore.PeerInfo // GetProviders
closerPeers []*pstore.PeerInfo // *
success bool
finalSet *pset.PeerSet
}
// constructs query
......@@ -155,7 +157,9 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
return r.result, nil
}
return nil, err
return &dhtQueryResult{
finalSet: r.peersSeen,
}, err
}
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
......
......@@ -165,7 +165,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
}
// get closest peers in the routing table
rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
log.Debugf("peers in rt: %d %s", len(rtp), rtp)
if len(rtp) == 0 {
log.Warning("No peers from routing table!")
......@@ -218,7 +218,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.PeerResponse,
ID: p,
Responses: pointerizePeerInfos(peers),
Responses: peers,
})
return res, nil
......@@ -316,14 +316,16 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid,
for _, p := range provs {
// NOTE: Assuming that this list of peers is unique
if ps.TryAdd(p) {
pi := dht.peerstore.PeerInfo(p)
select {
case peerOut <- dht.peerstore.PeerInfo(p):
case peerOut <- pi:
case <-ctx.Done():
return
}
}
// If we have enough peers locally, dont bother with remote RPC
// TODO: is this a DOS vector?
if ps.Size() >= count {
return
}
......@@ -351,7 +353,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid,
if ps.TryAdd(prov.ID) {
log.Debugf("using provider: %s", prov)
select {
case peerOut <- prov:
case peerOut <- *prov:
case <-ctx.Done():
log.Debug("context timed out sending more providers")
return nil, ctx.Err()
......@@ -371,12 +373,12 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid,
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.PeerResponse,
ID: p,
Responses: pointerizePeerInfos(clpeers),
Responses: clpeers,
})
return &dhtQueryResult{closerPeers: clpeers}, nil
})
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), KValue)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
_, err := query.Run(ctx, peers)
if err != nil {
log.Debugf("Query error: %s", err)
......@@ -406,7 +408,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo,
return pi, nil
}
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue)
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return pstore.PeerInfo{}, kb.ErrLookupFailure
}
......@@ -447,7 +449,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo,
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.PeerResponse,
Responses: pointerizePeerInfos(clpeerInfos),
Responses: clpeerInfos,
})
return &dhtQueryResult{closerPeers: clpeerInfos}, nil
......@@ -464,16 +466,16 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo,
return pstore.PeerInfo{}, routing.ErrNotFound
}
return result.peer, nil
return *result.peer, nil
}
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan pstore.PeerInfo, error) {
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
peerchan := make(chan pstore.PeerInfo, asyncQueryBuffer)
peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
peersSeen := make(map[peer.ID]struct{})
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue)
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return nil, kb.ErrLookupFailure
}
......@@ -486,7 +488,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
return nil, err
}
var clpeers []pstore.PeerInfo
var clpeers []*pstore.PeerInfo
closer := pmes.GetCloserPeers()
for _, pbp := range closer {
pi := pb.PBPeerToPeerInfo(pbp)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment