Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kad-dht
Commits
201eea5e
Unverified
Commit
201eea5e
authored
Jan 24, 2019
by
Steven Allen
Committed by
GitHub
Jan 24, 2019
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Revert "Tidy up bootstrapping"
parent
66cc80c8
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
121 additions
and
76 deletions
+121
-76
dht.go
dht.go
+0
-10
dht_bootstrap.go
dht_bootstrap.go
+104
-62
dht_test.go
dht_test.go
+16
-2
routing.go
routing.go
+1
-2
No files found.
dht.go
View file @
201eea5e
...
...
@@ -64,16 +64,6 @@ type IpfsDHT struct {
protocols
[]
protocol
.
ID
// DHT protocols
}
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var
(
_
routing
.
ContentRouting
=
(
*
IpfsDHT
)(
nil
)
_
routing
.
IpfsRouting
=
(
*
IpfsDHT
)(
nil
)
_
routing
.
PeerRouting
=
(
*
IpfsDHT
)(
nil
)
_
routing
.
PubKeyFetcher
=
(
*
IpfsDHT
)(
nil
)
_
routing
.
ValueStore
=
(
*
IpfsDHT
)(
nil
)
)
// New creates a new DHT with the specified host and options.
func
New
(
ctx
context
.
Context
,
h
host
.
Host
,
options
...
opts
.
Option
)
(
*
IpfsDHT
,
error
)
{
var
cfg
opts
.
Options
...
...
dht_bootstrap.go
View file @
201eea5e
...
...
@@ -7,8 +7,9 @@ import (
"time"
u
"github.com/ipfs/go-ipfs-util"
goprocess
"github.com/jbenet/goprocess"
periodicproc
"github.com/jbenet/goprocess/periodic"
peer
"github.com/libp2p/go-libp2p-peer"
pstore
"github.com/libp2p/go-libp2p-peerstore"
routing
"github.com/libp2p/go-libp2p-routing"
)
...
...
@@ -38,73 +39,87 @@ var DefaultBootstrapConfig = BootstrapConfig{
Timeout
:
time
.
Duration
(
10
*
time
.
Second
),
}
// A method in the IpfsRouting interface. It calls BootstrapWithConfig with
// the default bootstrap config.
// Bootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface
func
(
dht
*
IpfsDHT
)
Bootstrap
(
ctx
context
.
Context
)
error
{
return
dht
.
BootstrapWithConfig
(
ctx
,
DefaultBootstrapConfig
)
proc
,
err
:=
dht
.
BootstrapWithConfig
(
DefaultBootstrapConfig
)
if
err
!=
nil
{
return
err
}
// wait till ctx or dht.Context exits.
// we have to do it this way to satisfy the Routing interface (contexts)
go
func
()
{
defer
proc
.
Close
()
select
{
case
<-
ctx
.
Done
()
:
case
<-
dht
.
Context
()
.
Done
()
:
}
}()
return
nil
}
// Runs cfg.Queries bootstrap queries every cfg.Period.
func
(
dht
*
IpfsDHT
)
BootstrapWithConfig
(
ctx
context
.
Context
,
cfg
BootstrapConfig
)
error
{
// Because this method is not synchronous, we have to duplicate sanity
// checks on the config so that callers aren't oblivious.
// BootstrapWithConfig ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// BootstrapWithConfig returns a process, so the user can stop it.
func
(
dht
*
IpfsDHT
)
BootstrapWithConfig
(
cfg
BootstrapConfig
)
(
goprocess
.
Process
,
error
)
{
if
cfg
.
Queries
<=
0
{
return
fmt
.
Errorf
(
"invalid number of queries: %d"
,
cfg
.
Queries
)
return
nil
,
fmt
.
Errorf
(
"invalid number of queries: %d"
,
cfg
.
Queries
)
}
go
func
()
{
proc
:=
dht
.
Process
()
.
Go
(
func
(
p
goprocess
.
Process
)
{
<-
p
.
Go
(
dht
.
bootstrapWorker
(
cfg
))
.
Closed
()
for
{
err
:=
dht
.
runBootstrap
(
ctx
,
cfg
)
if
err
!=
nil
{
log
.
Warningf
(
"error bootstrapping: %s"
,
err
)
}
select
{
case
<-
time
.
After
(
cfg
.
Period
)
:
case
<-
ctx
.
Done
()
:
<-
p
.
Go
(
dht
.
bootstrapWorker
(
cfg
))
.
Closed
()
case
<-
p
.
Closing
()
:
return
}
}
}()
return
nil
})
return
proc
,
nil
}
// This is a synchronous bootstrap. cfg.Queries queries will run each with a
// timeout of cfg.Timeout. cfg.Period is not used.
func
(
dht
*
IpfsDHT
)
BootstrapOnce
(
ctx
context
.
Context
,
cfg
BootstrapConfig
)
error
{
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// SignalBootstrap returns a process, so the user can stop it.
func
(
dht
*
IpfsDHT
)
BootstrapOnSignal
(
cfg
BootstrapConfig
,
signal
<-
chan
time
.
Time
)
(
goprocess
.
Process
,
error
)
{
if
cfg
.
Queries
<=
0
{
return
fmt
.
Errorf
(
"invalid number of queries: %d"
,
cfg
.
Queries
)
return
nil
,
fmt
.
Errorf
(
"invalid number of queries: %d"
,
cfg
.
Queries
)
}
return
dht
.
runBootstrap
(
ctx
,
cfg
)
}
func
newRandomPeerId
()
peer
.
ID
{
id
:=
make
([]
byte
,
32
)
// SHA256 is the default. TODO: Use a more canonical way to generate random IDs.
rand
.
Read
(
id
)
id
=
u
.
Hash
(
id
)
// TODO: Feed this directly into the multihash instead of hashing it.
return
peer
.
ID
(
id
)
}
if
signal
==
nil
{
return
nil
,
fmt
.
Errorf
(
"invalid signal: %v"
,
signal
)
}
// Traverse the DHT toward the given ID.
func
(
dht
*
IpfsDHT
)
walk
(
ctx
context
.
Context
,
target
peer
.
ID
)
(
pstore
.
PeerInfo
,
error
)
{
// TODO: Extract the query action (traversal logic?) inside FindPeer,
// don't actually call through the FindPeer machinery, which can return
// things out of the peer store etc.
return
dht
.
FindPeer
(
ctx
,
target
)
proc
:=
periodicproc
.
Ticker
(
signal
,
dht
.
bootstrapWorker
(
cfg
))
return
proc
,
nil
}
// Traverse the DHT toward a random ID.
func
(
dht
*
IpfsDHT
)
randomWalk
(
ctx
context
.
Context
)
error
{
id
:=
newRandomPeerId
()
p
,
err
:=
dht
.
walk
(
ctx
,
id
)
switch
err
{
case
routing
.
ErrNotFound
:
return
nil
case
nil
:
// We found a peer from a randomly generated ID. This should be very
// unlikely.
log
.
Warningf
(
"random walk toward %s actually found peer: %s"
,
id
,
p
)
return
nil
default
:
return
err
func
(
dht
*
IpfsDHT
)
bootstrapWorker
(
cfg
BootstrapConfig
)
func
(
worker
goprocess
.
Process
)
{
return
func
(
worker
goprocess
.
Process
)
{
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?
ctx
:=
dht
.
Context
()
if
err
:=
dht
.
runBootstrap
(
ctx
,
cfg
);
err
!=
nil
{
log
.
Warning
(
err
)
// A bootstrapping error is important to notice but not fatal.
}
}
}
...
...
@@ -117,24 +132,51 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
defer
bslog
(
"end"
)
defer
log
.
EventBegin
(
ctx
,
"dhtRunBootstrap"
)
.
Done
()
doQuery
:=
func
(
n
int
,
target
string
,
f
func
(
context
.
Context
)
error
)
error
{
log
.
Debugf
(
"Bootstrapping query (%d/%d) to %s"
,
n
,
cfg
.
Queries
,
target
)
var
merr
u
.
MultiErr
randomID
:=
func
()
peer
.
ID
{
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id
:=
make
([]
byte
,
16
)
rand
.
Read
(
id
)
id
=
u
.
Hash
(
id
)
return
peer
.
ID
(
id
)
}
// bootstrap sequentially, as results will compound
runQuery
:=
func
(
ctx
context
.
Context
,
id
peer
.
ID
)
{
ctx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
cfg
.
Timeout
)
defer
cancel
()
return
f
(
ctx
)
}
// Do all but one of the bootstrap queries as random walks.
for
i
:=
1
;
i
<
cfg
.
Queries
;
i
++
{
err
:=
doQuery
(
i
,
"random ID"
,
dht
.
randomWalk
)
if
err
!=
nil
{
return
err
p
,
err
:=
dht
.
FindPeer
(
ctx
,
id
)
if
err
==
routing
.
ErrNotFound
{
// this isn't an error. this is precisely what we expect.
}
else
if
err
!=
nil
{
merr
=
append
(
merr
,
err
)
}
else
{
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err
:=
fmt
.
Errorf
(
"Bootstrap peer error: Actually FOUND peer. (%s, %s)"
,
id
,
p
)
log
.
Warningf
(
"%s"
,
err
)
merr
=
append
(
merr
,
err
)
}
}
// these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that.
for
i
:=
0
;
i
<
cfg
.
Queries
;
i
++
{
id
:=
randomID
()
log
.
Debugf
(
"Bootstrapping query (%d/%d) to random ID: %s"
,
i
+
1
,
cfg
.
Queries
,
id
)
runQuery
(
ctx
,
id
)
}
// Find self to distribute peer info to our neighbors.
return
doQuery
(
cfg
.
Queries
,
fmt
.
Sprintf
(
"self: %s"
,
dht
.
self
),
func
(
ctx
context
.
Context
)
error
{
_
,
err
:=
dht
.
walk
(
ctx
,
dht
.
self
)
return
err
})
// Do this after bootstrapping.
log
.
Debugf
(
"Bootstrapping query to self: %s"
,
dht
.
self
)
runQuery
(
ctx
,
dht
.
self
)
if
len
(
merr
)
>
0
{
return
merr
}
return
nil
}
dht_test.go
View file @
201eea5e
...
...
@@ -679,10 +679,23 @@ func TestPeriodicBootstrap(t *testing.T) {
}
}()
signals := []chan time.Time{}
var cfg BootstrapConfig
cfg = DefaultBootstrapConfig
cfg.Queries = 5
// kick off periodic bootstrappers with instrumented signals.
for _, dht := range dhts {
s := make(chan time.Time)
signals = append(signals, s)
proc, err := dht.BootstrapOnSignal(cfg, s)
if err != nil {
t.Fatal(err)
}
defer proc.Close()
}
t.Logf("dhts are not connected. %d", nDHTs)
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
...
...
@@ -708,8 +721,9 @@ func TestPeriodicBootstrap(t *testing.T) {
}
t.Logf("bootstrapping them so they find each other. %d", nDHTs)
for _, dht := range dhts {
go dht.BootstrapOnce(ctx, cfg)
now := time.Now()
for _, signal := range signals {
go func(s chan time.Time) { s <- now }(signal)
}
// this is async, and we dont know when it's finished with one cycle, so keep checking
...
...
routing.go
View file @
201eea5e
...
...
@@ -21,7 +21,6 @@ import (
routing
"github.com/libp2p/go-libp2p-routing"
notif
"github.com/libp2p/go-libp2p-routing/notifications"
ropts
"github.com/libp2p/go-libp2p-routing/options"
"github.com/pkg/errors"
)
// asyncQueryBuffer is the size of buffered channels in async queries. This
...
...
@@ -584,7 +583,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo
peers
:=
dht
.
routingTable
.
NearestPeers
(
kb
.
ConvertPeerID
(
id
),
AlphaValue
)
if
len
(
peers
)
==
0
{
return
pstore
.
PeerInfo
{},
errors
.
WithStack
(
kb
.
ErrLookupFailure
)
return
pstore
.
PeerInfo
{},
kb
.
ErrLookupFailure
}
// Sanity...
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment