Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kbucket
Commits
a58d5968
Commit
a58d5968
authored
Nov 18, 2019
by
Aarsh Shah
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Refresh Cpl's, not buckets
parent
d5af829b
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
90 additions
and
108 deletions
+90
-108
bucket.go
bucket.go
+0
-19
table.go
table.go
+55
-56
table_test.go
table_test.go
+35
-33
No files found.
bucket.go
View file @
a58d5968
...
...
@@ -5,7 +5,6 @@ package kbucket
import
(
"container/list"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
...
...
@@ -14,32 +13,14 @@ import (
type
Bucket
struct
{
lk
sync
.
RWMutex
list
*
list
.
List
lastRefreshedAtLk
sync
.
RWMutex
lastRefreshedAt
time
.
Time
// the last time we looked up a key in the bucket
}
func
newBucket
()
*
Bucket
{
b
:=
new
(
Bucket
)
b
.
list
=
list
.
New
()
b
.
lastRefreshedAt
=
time
.
Now
()
return
b
}
func
(
b
*
Bucket
)
RefreshedAt
()
time
.
Time
{
b
.
lastRefreshedAtLk
.
RLock
()
defer
b
.
lastRefreshedAtLk
.
RUnlock
()
return
b
.
lastRefreshedAt
}
func
(
b
*
Bucket
)
ResetRefreshedAt
(
newTime
time
.
Time
)
{
b
.
lastRefreshedAtLk
.
Lock
()
defer
b
.
lastRefreshedAtLk
.
Unlock
()
b
.
lastRefreshedAt
=
newTime
}
func
(
b
*
Bucket
)
Peers
()
[]
peer
.
ID
{
b
.
lk
.
RLock
()
defer
b
.
lk
.
RUnlock
()
...
...
table.go
View file @
a58d5968
...
...
@@ -21,6 +21,15 @@ var log = logging.Logger("table")
var
ErrPeerRejectedHighLatency
=
errors
.
New
(
"peer rejected; latency too high"
)
var
ErrPeerRejectedNoCapacity
=
errors
.
New
(
"peer rejected; insufficient capacity"
)
// MaxCplForRefresh is the maximum cpl we support for refresh.
// This limit exists because we can only generate 'MaxCplForRefresh' bit prefixes for now.
var
MaxCplForRefresh
uint
=
15
type
CplRefresh
struct
{
Cpl
uint
LastRefreshAt
time
.
Time
}
// RoutingTable defines the routing table.
type
RoutingTable
struct
{
// ID of the local peer
...
...
@@ -39,6 +48,9 @@ type RoutingTable struct {
Buckets
[]
*
Bucket
bucketsize
int
cplRefreshLk
sync
.
RWMutex
cplRefreshedAt
map
[
uint
]
time
.
Time
// notification functions
PeerRemoved
func
(
peer
.
ID
)
PeerAdded
func
(
peer
.
ID
)
...
...
@@ -47,84 +59,71 @@ type RoutingTable struct {
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
func
NewRoutingTable
(
bucketsize
int
,
localID
ID
,
latency
time
.
Duration
,
m
peerstore
.
Metrics
)
*
RoutingTable
{
rt
:=
&
RoutingTable
{
Buckets
:
[]
*
Bucket
{
newBucket
()},
bucketsize
:
bucketsize
,
local
:
localID
,
maxLatency
:
latency
,
metrics
:
m
,
PeerRemoved
:
func
(
peer
.
ID
)
{},
PeerAdded
:
func
(
peer
.
ID
)
{},
Buckets
:
[]
*
Bucket
{
newBucket
()},
bucketsize
:
bucketsize
,
local
:
localID
,
maxLatency
:
latency
,
metrics
:
m
,
cplRefreshedAt
:
make
(
map
[
uint
]
time
.
Time
),
PeerRemoved
:
func
(
peer
.
ID
)
{},
PeerAdded
:
func
(
peer
.
ID
)
{},
}
return
rt
}
// GetAllBuckets is safe to call as rt.Buckets is append-only
// caller SHOULD NOT modify the returned slice
func
(
rt
*
RoutingTable
)
GetAllBuckets
()
[]
*
Bucket
{
rt
.
tabLock
.
RLock
()
defer
rt
.
tabLock
.
RUnlock
()
return
rt
.
Buckets
}
// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
func
(
rt
*
RoutingTable
)
GetTrackedCplsForRefresh
()
[]
*
CplRefresh
{
rt
.
cplRefreshLk
.
RLock
()
defer
rt
.
cplRefreshLk
.
RUnlock
()
// GenRandPeerID generates a random peerID in bucket=bucketID
func
(
rt
*
RoutingTable
)
GenRandPeerID
(
bucketID
int
)
peer
.
ID
{
if
bucketID
<
0
{
panic
(
fmt
.
Sprintf
(
"bucketID %d is not non-negative"
,
bucketID
))
}
rt
.
tabLock
.
RLock
()
bucketLen
:=
len
(
rt
.
Buckets
)
rt
.
tabLock
.
RUnlock
()
var
cpls
[]
*
CplRefresh
var
targetCpl
uint
if
bucketID
>
(
bucketLen
-
1
)
{
targetCpl
=
uint
(
bucketLen
)
-
1
}
else
{
targetCpl
=
uint
(
bucketID
)
for
c
,
t
:=
range
rt
.
cplRefreshedAt
{
cpls
=
append
(
cpls
,
&
CplRefresh
{
c
,
t
})
}
// We can only handle upto 16 bit prefixes
if
targetCpl
>
16
{
targetCpl
=
16
return
cpls
}
// GenRandPeerID generates a random peerID for a given Cpl
func
(
rt
*
RoutingTable
)
GenRandPeerID
(
targetCpl
uint
)
(
peer
.
ID
,
error
)
{
if
targetCpl
>
MaxCplForRefresh
{
return
""
,
fmt
.
Errorf
(
"cannot generate peer ID for Cpl greater than %d"
,
MaxCplForRefresh
)
}
var
targetPrefix
uint16
localPrefix
:=
binary
.
BigEndian
.
Uint16
(
rt
.
local
)
if
targetCpl
<
16
{
// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
// to our randomly generated prefix.
toggledLocalPrefix
:=
localPrefix
^
(
uint16
(
0x8000
)
>>
targetCpl
)
randPrefix
:=
uint16
(
rand
.
Uint32
())
// Combine the toggled local prefix and the random bits at the correct offset
// such that ONLY the first `targetCpl` bits match the local ID.
mask
:=
(
^
uint16
(
0
))
<<
(
16
-
(
targetCpl
+
1
))
targetPrefix
=
(
toggledLocalPrefix
&
mask
)
|
(
randPrefix
&
^
mask
)
}
else
{
targetPrefix
=
localPrefix
}
// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
// to our randomly generated prefix.
toggledLocalPrefix
:=
localPrefix
^
(
uint16
(
0x8000
)
>>
targetCpl
)
randPrefix
:=
uint16
(
rand
.
Uint32
())
// Combine the toggled local prefix and the random bits at the correct offset
// such that ONLY the first `targetCpl` bits match the local ID.
mask
:=
(
^
uint16
(
0
))
<<
(
16
-
(
targetCpl
+
1
))
targetPrefix
:=
(
toggledLocalPrefix
&
mask
)
|
(
randPrefix
&
^
mask
)
// Convert to a known peer ID.
key
:=
keyPrefixMap
[
targetPrefix
]
id
:=
[
34
]
byte
{
mh
.
SHA2_256
,
32
}
binary
.
BigEndian
.
PutUint32
(
id
[
2
:
],
key
)
return
peer
.
ID
(
id
[
:
])
return
peer
.
ID
(
id
[
:
])
,
nil
}
// Returns the bucket for a given ID
// should NOT modify the peer list on the returned bucket
func
(
rt
*
RoutingTable
)
BucketForID
(
id
ID
)
*
Bucket
{
// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID.
func
(
rt
*
RoutingTable
)
ResetCplRefreshedAtForID
(
id
ID
,
newTime
time
.
Time
)
{
cpl
:=
CommonPrefixLen
(
id
,
rt
.
local
)
rt
.
tabLock
.
RLock
()
defer
rt
.
tabLock
.
RUnlock
()
bucketID
:=
cpl
if
bucketID
>=
len
(
rt
.
Buckets
)
{
bucketID
=
len
(
rt
.
Buckets
)
-
1
if
uint
(
cpl
)
>
MaxCplForRefresh
{
return
}
return
rt
.
Buckets
[
bucketID
]
rt
.
cplRefreshLk
.
Lock
()
defer
rt
.
cplRefreshLk
.
Unlock
()
rt
.
cplRefreshedAt
[
uint
(
cpl
)]
=
newTime
}
// Update adds or moves the given peer to the front of its respective bucket
...
...
table_test.go
View file @
a58d5968
...
...
@@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
pstore
"github.com/libp2p/go-libp2p-peerstore"
"github.com/stretchr/testify/require"
)
// Test basic features of the bucket struct
...
...
@@ -53,48 +54,49 @@ func TestBucket(t *testing.T) {
func
TestGenRandPeerID
(
t
*
testing
.
T
)
{
t
.
Parallel
()
nBuckets
:=
21
local
:=
test
.
RandPeerIDFatal
(
t
)
m
:=
pstore
.
NewMetrics
()
rt
:=
NewRoutingTable
(
1
,
ConvertPeerID
(
local
),
time
.
Hour
,
m
)
// create nBuckets
for
i
:=
0
;
i
<
nBuckets
;
i
++
{
for
{
if
p
:=
test
.
RandPeerIDFatal
(
t
);
CommonPrefixLen
(
ConvertPeerID
(
local
),
ConvertPeerID
(
p
))
==
i
{
rt
.
Update
(
p
)
break
}
}
// generate above MaxCplForRefresh fails
p
,
err
:=
rt
.
GenRandPeerID
(
MaxCplForRefresh
+
1
)
require
.
Error
(
t
,
err
)
require
.
Empty
(
t
,
p
)
// test generate rand peer ID
for
cpl
:=
uint
(
0
);
cpl
<=
MaxCplForRefresh
;
cpl
++
{
peerID
,
err
:=
rt
.
GenRandPeerID
(
cpl
)
require
.
NoError
(
t
,
err
)
require
.
True
(
t
,
uint
(
CommonPrefixLen
(
ConvertPeerID
(
peerID
),
rt
.
local
))
==
cpl
,
"failed for cpl=%d"
,
cpl
)
}
}
// test bucket for peer
peers
:=
rt
.
ListPeers
()
for
_
,
p
:=
range
peers
{
b
:=
rt
.
BucketForID
(
ConvertPeerID
(
p
))
if
!
b
.
Has
(
p
)
{
t
.
Fatalf
(
"bucket should have peers %s"
,
p
.
String
())
}
func
TestRefreshAndGetTrackedCpls
(
t
*
testing
.
T
)
{
t
.
Parallel
()
local
:=
test
.
RandPeerIDFatal
(
t
)
m
:=
pstore
.
NewMetrics
()
rt
:=
NewRoutingTable
(
1
,
ConvertPeerID
(
local
),
time
.
Hour
,
m
)
// add cpl's for tracking
for
cpl
:=
uint
(
0
);
cpl
<
MaxCplForRefresh
;
cpl
++
{
peerID
,
err
:=
rt
.
GenRandPeerID
(
cpl
)
require
.
NoError
(
t
,
err
)
rt
.
ResetCplRefreshedAtForID
(
ConvertPeerID
(
peerID
),
time
.
Now
())
}
// test generate rand peer ID
for
bucketID
:=
0
;
bucketID
<
nBuckets
;
bucketID
++
{
peerID
:=
rt
.
GenRandPeerID
(
bucketID
)
// for bucketID upto maxPrefixLen of 16, CPL should be Exactly bucketID
if
bucketID
<
16
{
if
CommonPrefixLen
(
ConvertPeerID
(
peerID
),
rt
.
local
)
!=
bucketID
{
t
.
Fatalf
(
"cpl should be %d for bucket %d but got %d, generated peerID is %s"
,
bucketID
,
bucketID
,
CommonPrefixLen
(
ConvertPeerID
(
peerID
),
rt
.
local
),
peerID
)
}
}
else
{
// from bucketID 16 onwards, CPL should be ATLEAST 16
if
CommonPrefixLen
(
ConvertPeerID
(
peerID
),
rt
.
local
)
<
16
{
t
.
Fatalf
(
"cpl should be ATLEAST 16 for bucket %d but got %d, generated peerID is %s"
,
bucketID
,
CommonPrefixLen
(
ConvertPeerID
(
peerID
),
rt
.
local
),
peerID
)
}
}
// fetch cpl's
trackedCpls
:=
rt
.
GetTrackedCplsForRefresh
()
require
.
Len
(
t
,
trackedCpls
,
int
(
MaxCplForRefresh
))
actualCpls
:=
make
(
map
[
uint
]
struct
{})
for
i
:=
0
;
i
<
len
(
trackedCpls
);
i
++
{
actualCpls
[
trackedCpls
[
i
]
.
Cpl
]
=
struct
{}{}
}
for
i
:=
uint
(
0
);
i
<
MaxCplForRefresh
;
i
++
{
_
,
ok
:=
actualCpls
[
i
]
require
.
True
(
t
,
ok
,
"tracked cpl's should have cpl %d"
,
i
)
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment