Unverified Commit d5af829b authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #43 from libp2p/fix/multi-bucket

fix: when the target bucket is empty or low, pull from all other buckets
parents e8de4d1a 5ab8e823
...@@ -8,3 +8,5 @@ require ( ...@@ -8,3 +8,5 @@ require (
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16
github.com/multiformats/go-multihash v0.0.10 github.com/multiformats/go-multihash v0.0.10
) )
go 1.13
...@@ -238,32 +238,51 @@ func (rt *RoutingTable) NearestPeer(id ID) peer.ID { ...@@ -238,32 +238,51 @@ func (rt *RoutingTable) NearestPeer(id ID) peer.ID {
// NearestPeers returns a list of the 'count' closest peers to the given ID // NearestPeers returns a list of the 'count' closest peers to the given ID
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
// This is the number of bits _we_ share with the key. All peers in this
// bucket share cpl bits with us and will therefore share at least cpl+1
// bits with the given key. +1 because both the target and all peers in
// this bucket differ from us in the cpl bit.
cpl := CommonPrefixLen(id, rt.local) cpl := CommonPrefixLen(id, rt.local)
// It's assumed that this also protects the buckets. // It's assumed that this also protects the buckets.
rt.tabLock.RLock() rt.tabLock.RLock()
// Get bucket at cpl index or last bucket // Get bucket index or last bucket
var bucket *Bucket
if cpl >= len(rt.Buckets) { if cpl >= len(rt.Buckets) {
cpl = len(rt.Buckets) - 1 cpl = len(rt.Buckets) - 1
} }
bucket = rt.Buckets[cpl]
pds := peerDistanceSorter{ pds := peerDistanceSorter{
peers: make([]peerDistance, 0, 3*rt.bucketsize), peers: make([]peerDistance, 0, count+rt.bucketsize),
target: id, target: id,
} }
pds.appendPeersFromList(bucket.list)
if pds.Len() < count { // Add peers from the target bucket (cpl+1 shared bits).
// In the case of an unusual split, one bucket may be short or empty. pds.appendPeersFromList(rt.Buckets[cpl].list)
// if this happens, search both surrounding buckets for nearby peers
if cpl > 0 { // If we're short, add peers from buckets to the right until we have
pds.appendPeersFromList(rt.Buckets[cpl-1].list) // enough. All buckets to the right share exactly cpl bits (as opposed
} // to the cpl+1 bits shared by the peers in the cpl bucket).
if cpl < len(rt.Buckets)-1 { //
pds.appendPeersFromList(rt.Buckets[cpl+1].list) // Unfortunately, to be completely correct, we can't just take from
} // buckets until we have enough peers because peers because _all_ of
// these peers will be ~2**(256-cpl) from us.
//
// However, we're going to do that anyways as it's "good enough"
for i := cpl + 1; i < len(rt.Buckets) && pds.Len() < count; i++ {
pds.appendPeersFromList(rt.Buckets[i].list)
}
// If we're still short, add in buckets that share _fewer_ bits. We can
// do this bucket by bucket because each bucket will share 1 fewer bit
// than the last.
//
// * bucket cpl-1: cpl-2 shared bits.
// * bucket cpl-2: cpl-3 shared bits.
// ...
for i := cpl - 1; i >= 0 && pds.Len() < count; i-- {
pds.appendPeersFromList(rt.Buckets[i].list)
} }
rt.tabLock.RUnlock() rt.tabLock.RUnlock()
......
...@@ -12,6 +12,8 @@ import ( ...@@ -12,6 +12,8 @@ import (
// Test basic features of the bucket struct // Test basic features of the bucket struct
func TestBucket(t *testing.T) { func TestBucket(t *testing.T) {
t.Parallel()
b := newBucket() b := newBucket()
peers := make([]peer.ID, 100) peers := make([]peer.ID, 100)
...@@ -49,6 +51,8 @@ func TestBucket(t *testing.T) { ...@@ -49,6 +51,8 @@ func TestBucket(t *testing.T) {
} }
func TestGenRandPeerID(t *testing.T) { func TestGenRandPeerID(t *testing.T) {
t.Parallel()
nBuckets := 21 nBuckets := 21
local := test.RandPeerIDFatal(t) local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics() m := pstore.NewMetrics()
...@@ -95,6 +99,8 @@ func TestGenRandPeerID(t *testing.T) { ...@@ -95,6 +99,8 @@ func TestGenRandPeerID(t *testing.T) {
} }
func TestTableCallbacks(t *testing.T) { func TestTableCallbacks(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t) local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics() m := pstore.NewMetrics()
rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m)
...@@ -141,6 +147,8 @@ func TestTableCallbacks(t *testing.T) { ...@@ -141,6 +147,8 @@ func TestTableCallbacks(t *testing.T) {
// Right now, this just makes sure that it doesnt hang or crash // Right now, this just makes sure that it doesnt hang or crash
func TestTableUpdate(t *testing.T) { func TestTableUpdate(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t) local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics() m := pstore.NewMetrics()
rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m)
...@@ -165,6 +173,8 @@ func TestTableUpdate(t *testing.T) { ...@@ -165,6 +173,8 @@ func TestTableUpdate(t *testing.T) {
} }
func TestTableFind(t *testing.T) { func TestTableFind(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t) local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics() m := pstore.NewMetrics()
rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m)
...@@ -183,6 +193,8 @@ func TestTableFind(t *testing.T) { ...@@ -183,6 +193,8 @@ func TestTableFind(t *testing.T) {
} }
func TestTableEldestPreferred(t *testing.T) { func TestTableEldestPreferred(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t) local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics() m := pstore.NewMetrics()
rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m) rt := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m)
...@@ -212,6 +224,8 @@ func TestTableEldestPreferred(t *testing.T) { ...@@ -212,6 +224,8 @@ func TestTableEldestPreferred(t *testing.T) {
} }
func TestTableFindMultiple(t *testing.T) { func TestTableFindMultiple(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t) local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics() m := pstore.NewMetrics()
rt := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m) rt := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m)
...@@ -229,10 +243,136 @@ func TestTableFindMultiple(t *testing.T) { ...@@ -229,10 +243,136 @@ func TestTableFindMultiple(t *testing.T) {
} }
} }
func assertSortedPeerIdsEqual(t *testing.T, a, b []peer.ID) {
t.Helper()
if len(a) != len(b) {
t.Fatal("slices aren't the same length")
}
for i, p := range a {
if p != b[i] {
t.Fatalf("unexpected peer %d", i)
}
}
}
func TestTableFindMultipleBuckets(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
localID := ConvertPeerID(local)
m := pstore.NewMetrics()
rt := NewRoutingTable(5, localID, time.Hour, m)
peers := make([]peer.ID, 100)
for i := 0; i < 100; i++ {
peers[i] = test.RandPeerIDFatal(t)
rt.Update(peers[i])
}
targetID := ConvertPeerID(peers[2])
closest := SortClosestPeers(rt.ListPeers(), targetID)
targetCpl := CommonPrefixLen(localID, targetID)
// Split the peers into closer, same, and further from the key (than us).
var (
closer, same, further []peer.ID
)
var i int
for i = 0; i < len(closest); i++ {
cpl := CommonPrefixLen(ConvertPeerID(closest[i]), targetID)
if targetCpl >= cpl {
break
}
}
closer = closest[:i]
var j int
for j = len(closer); j < len(closest); j++ {
cpl := CommonPrefixLen(ConvertPeerID(closest[j]), targetID)
if targetCpl > cpl {
break
}
}
same = closest[i:j]
further = closest[j:]
// should be able to find at least 30
// ~31 (logtwo(100) * 5)
found := rt.NearestPeers(targetID, 20)
if len(found) != 20 {
t.Fatalf("asked for 20 peers, got %d", len(found))
}
// We expect this list to include:
// * All peers with a common prefix length > than the CPL between our key
// and the target (peers in the target bucket).
// * Then a subset of the peers with the _same_ CPL (peers in buckets to the right).
// * Finally, other peers to the left of the target bucket.
// First, handle the peers that are _closer_ than us.
if len(found) <= len(closer) {
// All of our peers are "closer".
assertSortedPeerIdsEqual(t, found, closer[:len(found)])
return
}
assertSortedPeerIdsEqual(t, found[:len(closer)], closer)
// We've worked through closer peers, let's work through peers at the
// "same" distance.
found = found[len(closer):]
// Next, handle peers that are at the same distance as us.
if len(found) <= len(same) {
// Ok, all the remaining peers are at the same distance.
// unfortunately, that means we may be missing peers that are
// technically closer.
// Make sure all remaining peers are _somewhere_ in the "closer" set.
pset := peer.NewSet()
for _, p := range same {
pset.Add(p)
}
for _, p := range found {
if !pset.Contains(p) {
t.Fatalf("unexpected peer %s", p)
}
}
return
}
assertSortedPeerIdsEqual(t, found[:len(same)], same)
// We've worked through closer peers, let's work through the further
// peers.
found = found[len(same):]
// All _further_ peers will be correctly sorted.
assertSortedPeerIdsEqual(t, found, further[:len(found)])
// Finally, test getting _all_ peers. These should be in the correct
// order.
// Ok, now let's try finding all of them.
found = rt.NearestPeers(ConvertPeerID(peers[2]), 100)
if len(found) != rt.Size() {
t.Fatalf("asked for %d peers, got %d", rt.Size(), len(found))
}
// We should get _all_ peers in sorted order.
for i, p := range found {
if p != closest[i] {
t.Fatalf("unexpected peer %d", i)
}
}
}
// Looks for race conditions in table operations. For a more 'certain' // Looks for race conditions in table operations. For a more 'certain'
// test, increase the loop counter from 1000 to a much higher number // test, increase the loop counter from 1000 to a much higher number
// and set GOMAXPROCS above 1 // and set GOMAXPROCS above 1
func TestTableMultithreaded(t *testing.T) { func TestTableMultithreaded(t *testing.T) {
t.Parallel()
local := peer.ID("localPeer") local := peer.ID("localPeer")
m := pstore.NewMetrics() m := pstore.NewMetrics()
tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m) tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour, m)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment