Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-dms3-routing
Commits
656354ee
Commit
656354ee
authored
Jan 16, 2015
by
Juan Batiz-Benet
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
routing/dht: periodic bootstrapping #572
parent
2be84c9b
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
327 additions
and
101 deletions
+327
-101
dht/dht.go
dht/dht.go
+0
-63
dht/dht_bootstrap.go
dht/dht_bootstrap.go
+181
-0
dht/dht_test.go
dht/dht_test.go
+146
-38
No files found.
dht/dht.go
View file @
656354ee
...
...
@@ -370,66 +370,3 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
}
}
}
// Bootstrap builds up list of peers by requesting random peer IDs
func
(
dht
*
IpfsDHT
)
Bootstrap
(
ctx
context
.
Context
,
queries
int
)
error
{
var
merr
u
.
MultiErr
randomID
:=
func
()
peer
.
ID
{
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id
:=
make
([]
byte
,
16
)
rand
.
Read
(
id
)
return
peer
.
ID
(
id
)
}
// bootstrap sequentially, as results will compound
runQuery
:=
func
(
ctx
context
.
Context
,
id
peer
.
ID
)
{
p
,
err
:=
dht
.
FindPeer
(
ctx
,
id
)
if
err
==
routing
.
ErrNotFound
{
// this isn't an error. this is precisely what we expect.
}
else
if
err
!=
nil
{
merr
=
append
(
merr
,
err
)
}
else
{
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err
:=
fmt
.
Errorf
(
"Bootstrap peer error: Actually FOUND peer. (%s, %s)"
,
id
,
p
)
log
.
Errorf
(
"%s"
,
err
)
merr
=
append
(
merr
,
err
)
}
}
sequential
:=
true
if
sequential
{
// these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that.
for
i
:=
0
;
i
<
queries
;
i
++
{
id
:=
randomID
()
log
.
Debugf
(
"Bootstrapping query (%d/%d) to random ID: %s"
,
i
+
1
,
queries
,
id
)
runQuery
(
ctx
,
id
)
}
}
else
{
// note on parallelism here: the context is passed in to the queries, so they
// **should** exit when it exceeds, making this function exit on ctx cancel.
// normally, we should be selecting on ctx.Done() here too, but this gets
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
var
wg
sync
.
WaitGroup
for
i
:=
0
;
i
<
queries
;
i
++
{
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
id
:=
randomID
()
log
.
Debugf
(
"Bootstrapping query (%d/%d) to random ID: %s"
,
i
+
1
,
queries
,
id
)
runQuery
(
ctx
,
id
)
}()
}
wg
.
Wait
()
}
if
len
(
merr
)
>
0
{
return
merr
}
return
nil
}
dht/dht_bootstrap.go
0 → 100644
View file @
656354ee
// Package dht implements a distributed hash table that satisfies the ipfs routing
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
package
dht
import
(
"crypto/rand"
"fmt"
"sync"
"time"
peer
"github.com/jbenet/go-ipfs/p2p/peer"
routing
"github.com/jbenet/go-ipfs/routing"
u
"github.com/jbenet/go-ipfs/util"
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
goprocess
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
// DefaultBootstrapQueries specifies how many queries to run,
// if the user does not specify a different number as an option.
//
// For now, this is set to 16 queries, which is an aggressive number.
// We are currently more interested in ensuring we have a properly formed
// DHT than making sure our dht minimizes traffic. Once we are more certain
// of our implementation's robustness, we should lower this down to 8 or 4.
//
// Note there is also a tradeoff between the bootstrap period and the number
// of queries. We could support a higher period with a smaller number of
// queries
const
DefaultBootstrapQueries
=
16
// DefaultBootstrapPeriod specifies how often to periodically run bootstrap,
// if the user does not specify a different number as an option.
//
// For now, this is set to 10 seconds, which is an aggressive period. We are
// We are currently more interested in ensuring we have a properly formed
// DHT than making sure our dht minimizes traffic. Once we are more certain
// implementation's robustness, we should lower this down to 30s or 1m.
//
// Note there is also a tradeoff between the bootstrap period and the number
// of queries. We could support a higher period with a smaller number of
// queries
const
DefaultBootstrapPeriod
=
time
.
Duration
(
10
*
time
.
Second
)
// Bootstrap runs bootstrapping once, then calls SignalBootstrap with default
// parameters: DefaultBootstrapQueries and DefaultBootstrapPeriod. This allows
// the user to catch an error off the bat if the connections are faulty. It also
// allows BootstrapOnSignal not to run bootstrap at the beginning, which is useful
// for instrumenting it on tests, or delaying bootstrap until the network is online
// and connected to at least a few nodes.
//
// Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it.
func
(
dht
*
IpfsDHT
)
Bootstrap
()
(
goprocess
.
Process
,
error
)
{
if
err
:=
dht
.
runBootstrap
(
dht
.
Context
(),
DefaultBootstrapQueries
);
err
!=
nil
{
return
nil
,
err
}
sig
:=
time
.
Tick
(
DefaultBootstrapPeriod
)
return
dht
.
BootstrapOnSignal
(
DefaultBootstrapQueries
,
sig
)
}
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// SignalBootstrap returns a process, so the user can stop it.
func
(
dht
*
IpfsDHT
)
BootstrapOnSignal
(
queries
int
,
signal
<-
chan
time
.
Time
)
(
goprocess
.
Process
,
error
)
{
if
queries
<=
0
{
return
nil
,
fmt
.
Errorf
(
"invalid number of queries: %d"
,
queries
)
}
if
signal
==
nil
{
return
nil
,
fmt
.
Errorf
(
"invalid signal: %v"
,
signal
)
}
proc
:=
goprocess
.
Go
(
func
(
worker
goprocess
.
Process
)
{
for
{
select
{
case
<-
worker
.
Closing
()
:
log
.
Debug
(
"dht bootstrapper shutting down"
)
return
case
<-
signal
:
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?
ctx
:=
dht
.
Context
()
if
err
:=
dht
.
runBootstrap
(
ctx
,
queries
);
err
!=
nil
{
log
.
Error
(
err
)
// A bootstrapping error is important to notice but not fatal.
// maybe the client should be able to consume these errors,
// though I dont have a clear use case in mind-- what **could**
// the client do if one of the bootstrap calls fails?
//
// This is also related to the core's bootstrap failures.
// superviseConnections should perhaps allow clients to detect
// bootstrapping problems.
//
// Anyway, passing errors could be done with a bootstrapper object.
// this would imply the client should be able to consume a lot of
// other non-fatal dht errors too. providing this functionality
// should be done correctly DHT-wide.
// NB: whatever the design, clients must ensure they drain errors!
// This pattern is common to many things, perhaps long-running services
// should have something like an ErrStream that allows clients to consume
// periodic errors and take action. It should allow the user to also
// ignore all errors with something like an ErrStreamDiscard. We should
// study what other systems do for ideas.
}
}
}
})
return
proc
,
nil
}
// runBootstrap builds up list of peers by requesting random peer IDs
func
(
dht
*
IpfsDHT
)
runBootstrap
(
ctx
context
.
Context
,
queries
int
)
error
{
var
merr
u
.
MultiErr
randomID
:=
func
()
peer
.
ID
{
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id
:=
make
([]
byte
,
16
)
rand
.
Read
(
id
)
return
peer
.
ID
(
id
)
}
// bootstrap sequentially, as results will compound
runQuery
:=
func
(
ctx
context
.
Context
,
id
peer
.
ID
)
{
p
,
err
:=
dht
.
FindPeer
(
ctx
,
id
)
if
err
==
routing
.
ErrNotFound
{
// this isn't an error. this is precisely what we expect.
}
else
if
err
!=
nil
{
merr
=
append
(
merr
,
err
)
}
else
{
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err
:=
fmt
.
Errorf
(
"Bootstrap peer error: Actually FOUND peer. (%s, %s)"
,
id
,
p
)
log
.
Errorf
(
"%s"
,
err
)
merr
=
append
(
merr
,
err
)
}
}
sequential
:=
true
if
sequential
{
// these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that.
for
i
:=
0
;
i
<
queries
;
i
++
{
id
:=
randomID
()
log
.
Debugf
(
"Bootstrapping query (%d/%d) to random ID: %s"
,
i
+
1
,
queries
,
id
)
runQuery
(
ctx
,
id
)
}
}
else
{
// note on parallelism here: the context is passed in to the queries, so they
// **should** exit when it exceeds, making this function exit on ctx cancel.
// normally, we should be selecting on ctx.Done() here too, but this gets
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
var
wg
sync
.
WaitGroup
for
i
:=
0
;
i
<
queries
;
i
++
{
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
id
:=
randomID
()
log
.
Debugf
(
"Bootstrapping query (%d/%d) to random ID: %s"
,
i
+
1
,
queries
,
id
)
runQuery
(
ctx
,
id
)
}()
}
wg
.
Wait
()
}
if
len
(
merr
)
>
0
{
return
merr
}
return
nil
}
dht/dht_test.go
View file @
656354ee
...
...
@@ -75,25 +75,20 @@ func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func
bootstrap
(
t
*
testing
.
T
,
ctx
context
.
Context
,
dhts
[]
*
IpfsDHT
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
log
.
Error
(
"hmm"
)
defer
log
.
Error
(
"hmm end"
)
log
.
Debugf
(
"bootstrapping dhts..."
)
rounds
:=
1
// tried async. sequential fares much better. compare:
// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
// probably because results compound
for
i
:=
0
;
i
<
rounds
;
i
++
{
log
.
Debugf
(
"bootstrapping round %d/%d
\n
"
,
i
,
rounds
)
// tried async. sequential fares much better. compare:
// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
// probably because results compound
start
:=
rand
.
Intn
(
len
(
dhts
))
// randomize to decrease bias.
for
i
:=
range
dhts
{
dht
:=
dhts
[(
start
+
i
)
%
len
(
dhts
)]
log
.
Debugf
(
"bootstrapping round %d/%d -- %s
\n
"
,
i
,
rounds
,
dht
.
self
)
dht
.
Bootstrap
(
ctx
,
3
)
}
start
:=
rand
.
Intn
(
len
(
dhts
))
// randomize to decrease bias.
for
i
:=
range
dhts
{
dht
:=
dhts
[(
start
+
i
)
%
len
(
dhts
)]
dht
.
runBootstrap
(
ctx
,
3
)
}
cancel
()
}
...
...
@@ -235,6 +230,53 @@ func TestProvides(t *testing.T) {
}
}
// if minPeers or avgPeers is 0, dont test for it.
func
waitForWellFormedTables
(
t
*
testing
.
T
,
dhts
[]
*
IpfsDHT
,
minPeers
,
avgPeers
int
,
timeout
time
.
Duration
)
bool
{
// test "well-formed-ness" (>= minPeers peers in every routing table)
checkTables
:=
func
()
bool
{
totalPeers
:=
0
for
_
,
dht
:=
range
dhts
{
rtlen
:=
dht
.
routingTable
.
Size
()
totalPeers
+=
rtlen
if
minPeers
>
0
&&
rtlen
<
minPeers
{
t
.
Logf
(
"routing table for %s only has %d peers (should have >%d)"
,
dht
.
self
,
rtlen
,
minPeers
)
return
false
}
}
actualAvgPeers
:=
totalPeers
/
len
(
dhts
)
t
.
Logf
(
"avg rt size: %d"
,
actualAvgPeers
)
if
avgPeers
>
0
&&
actualAvgPeers
<
avgPeers
{
t
.
Logf
(
"avg rt size: %d < %d"
,
actualAvgPeers
,
avgPeers
)
return
false
}
return
true
}
timeoutA
:=
time
.
After
(
timeout
)
for
{
select
{
case
<-
timeoutA
:
log
.
Error
(
"did not reach well-formed routing tables by %s"
,
timeout
)
return
false
// failed
case
<-
time
.
After
(
5
*
time
.
Millisecond
)
:
if
checkTables
()
{
return
true
// succeeded
}
}
}
}
func
printRoutingTables
(
dhts
[]
*
IpfsDHT
)
{
// the routing tables should be full now. let's inspect them.
fmt
.
Println
(
"checking routing table of %d"
,
len
(
dhts
))
for
_
,
dht
:=
range
dhts
{
fmt
.
Printf
(
"checking routing table of %s
\n
"
,
dht
.
self
)
dht
.
routingTable
.
Print
()
fmt
.
Println
(
""
)
}
}
func
TestBootstrap
(
t
*
testing
.
T
)
{
// t.Skip("skipping test to debug another")
if
testing
.
Short
()
{
...
...
@@ -258,38 +300,105 @@ func TestBootstrap(t *testing.T) {
}
<-
time
.
After
(
100
*
time
.
Millisecond
)
t
.
Logf
(
"bootstrapping them so they find each other"
,
nDHTs
)
ctxT
,
_
:=
context
.
WithTimeout
(
ctx
,
5
*
time
.
Second
)
bootstrap
(
t
,
ctxT
,
dhts
)
// bootstrap a few times until we get good tables.
stop
:=
make
(
chan
struct
{})
go
func
()
{
for
{
t
.
Logf
(
"bootstrapping them so they find each other"
,
nDHTs
)
ctxT
,
_
:=
context
.
WithTimeout
(
ctx
,
5
*
time
.
Second
)
bootstrap
(
t
,
ctxT
,
dhts
)
select
{
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
continue
// being explicit
case
<-
stop
:
return
}
}
}()
waitForWellFormedTables
(
t
,
dhts
,
7
,
10
,
5
*
time
.
Second
)
close
(
stop
)
if
u
.
Debug
{
// the routing tables should be full now. let's inspect them.
<-
time
.
After
(
5
*
time
.
Second
)
t
.
Logf
(
"checking routing table of %d"
,
nDHTs
)
for
_
,
dht
:=
range
dhts
{
fmt
.
Printf
(
"checking routing table of %s
\n
"
,
dht
.
self
)
dht
.
routingTable
.
Print
()
fmt
.
Println
(
""
)
printRoutingTables
(
dhts
)
}
}
func
TestPeriodicBootstrap
(
t
*
testing
.
T
)
{
// t.Skip("skipping test to debug another")
if
testing
.
Short
()
{
t
.
SkipNow
()
}
ctx
:=
context
.
Background
()
nDHTs
:=
30
_
,
_
,
dhts
:=
setupDHTS
(
ctx
,
nDHTs
,
t
)
defer
func
()
{
for
i
:=
0
;
i
<
nDHTs
;
i
++
{
dhts
[
i
]
.
Close
()
defer
dhts
[
i
]
.
host
.
Close
()
}
}()
// signal amplifier
amplify
:=
func
(
signal
chan
time
.
Time
,
other
[]
chan
time
.
Time
)
{
for
t
:=
range
signal
{
for
_
,
s
:=
range
other
{
s
<-
t
}
}
for
_
,
s
:=
range
other
{
close
(
s
)
}
}
// test "well-formed-ness" (>= 3 peers in every routing table)
avgsize
:=
0
signal
:=
make
(
chan
time
.
Time
)
allSignals
:=
[]
chan
time
.
Time
{}
// kick off periodic bootstrappers with instrumented signals.
for
_
,
dht
:=
range
dhts
{
s
:=
make
(
chan
time
.
Time
)
allSignals
=
append
(
allSignals
,
s
)
dht
.
BootstrapOnSignal
(
5
,
s
)
}
go
amplify
(
signal
,
allSignals
)
t
.
Logf
(
"dhts are not connected."
,
nDHTs
)
for
_
,
dht
:=
range
dhts
{
rtlen
:=
dht
.
routingTable
.
Size
()
if
rtlen
>
0
{
t
.
Errorf
(
"routing table for %s should have 0 peers. has %d"
,
dht
.
self
,
rtlen
)
}
}
for
i
:=
0
;
i
<
nDHTs
;
i
++
{
connect
(
t
,
ctx
,
dhts
[
i
],
dhts
[(
i
+
1
)
%
len
(
dhts
)])
}
t
.
Logf
(
"dhts are now connected to 1-2 others."
,
nDHTs
)
for
_
,
dht
:=
range
dhts
{
rtlen
:=
dht
.
routingTable
.
Size
()
avgsize
+=
rtlen
t
.
Logf
(
"routing table for %s has %d peers"
,
dht
.
self
,
rtlen
)
if
rtlen
<
4
{
// currently, we dont have good bootstrapping guarantees.
// t.Errorf("routing table for %s only has %d peers", dht.self, rtlen)
if
rtlen
>
2
{
t
.
Errorf
(
"routing table for %s should have at most 2 peers. has %d"
,
dht
.
self
,
rtlen
)
}
}
avgsize
=
avgsize
/
len
(
dhts
)
avgsizeExpected
:=
6
t
.
Logf
(
"avg rt size: %d"
,
avgsize
)
if
avgsize
<
avgsizeExpected
{
t
.
Errorf
(
"avg rt size: %d < %d"
,
avgsize
,
avgsizeExpected
)
if
u
.
Debug
{
printRoutingTables
(
dhts
)
}
t
.
Logf
(
"bootstrapping them so they find each other"
,
nDHTs
)
signal
<-
time
.
Now
()
// this is async, and we dont know when it's finished with one cycle, so keep checking
// until the routing tables look better, or some long timeout for the failure case.
waitForWellFormedTables
(
t
,
dhts
,
7
,
10
,
5
*
time
.
Second
)
if
u
.
Debug
{
printRoutingTables
(
dhts
)
}
}
...
...
@@ -319,7 +428,6 @@ func TestProvidesMany(t *testing.T) {
if
u
.
Debug
{
// the routing tables should be full now. let's inspect them.
<-
time
.
After
(
5
*
time
.
Second
)
t
.
Logf
(
"checking routing table of %d"
,
nDHTs
)
for
_
,
dht
:=
range
dhts
{
fmt
.
Printf
(
"checking routing table of %s
\n
"
,
dht
.
self
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment