Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kad-dht
Commits
f423e38e
Commit
f423e38e
authored
Jan 23, 2019
by
Matt Joiner
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Simplify the bootstrap logic
parent
f91dc289
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
64 additions
and
87 deletions
+64
-87
dht_bootstrap.go
dht_bootstrap.go
+61
-85
dht_test.go
dht_test.go
+1
-1
routing.go
routing.go
+2
-1
No files found.
dht_bootstrap.go
View file @
f423e38e
...
...
@@ -7,8 +7,8 @@ import (
"time"
u
"github.com/ipfs/go-ipfs-util"
goprocess
"github.com/jbenet/goprocess"
peer
"github.com/libp2p/go-libp2p-peer"
peerstore
"github.com/libp2p/go-libp2p-peerstore"
routing
"github.com/libp2p/go-libp2p-routing"
)
...
...
@@ -38,72 +38,75 @@ var DefaultBootstrapConfig = BootstrapConfig{
Timeout
:
time
.
Duration
(
10
*
time
.
Second
),
}
// Bootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface
// A method in the IpfsRouting interface. It calls BootstrapWithConfig with
// the default bootstrap config.
func
(
dht
*
IpfsDHT
)
Bootstrap
(
ctx
context
.
Context
)
error
{
proc
,
err
:=
dht
.
BootstrapWithConfig
(
DefaultBootstrapConfig
)
if
err
!=
nil
{
return
err
}
return
dht
.
BootstrapWithConfig
(
ctx
,
DefaultBootstrapConfig
)
}
// wait till ctx or dht.Context exits.
// we have to do it this way to satisfy the Routing interface (contexts)
// Runs cfg.Queries bootstrap queries every cfg.Period.
func
(
dht
*
IpfsDHT
)
BootstrapWithConfig
(
ctx
context
.
Context
,
cfg
BootstrapConfig
)
error
{
if
cfg
.
Queries
<=
0
{
return
fmt
.
Errorf
(
"invalid number of queries: %d"
,
cfg
.
Queries
)
}
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
go
func
()
{
defer
proc
.
Close
()
defer
cancel
()
select
{
case
<-
ctx
.
Done
()
:
case
<-
dht
.
Context
()
.
Done
()
:
case
<-
ctx
.
Done
()
:
}
}()
return
nil
}
// BootstrapWithConfig ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// BootstrapWithConfig returns a process, so the user can stop it.
func
(
dht
*
IpfsDHT
)
BootstrapWithConfig
(
cfg
BootstrapConfig
)
(
goprocess
.
Process
,
error
)
{
if
cfg
.
Queries
<=
0
{
return
nil
,
fmt
.
Errorf
(
"invalid number of queries: %d"
,
cfg
.
Queries
)
}
proc
:=
dht
.
Process
()
.
Go
(
func
(
p
goprocess
.
Process
)
{
<-
p
.
Go
(
dht
.
bootstrapWorker
(
cfg
))
.
Closed
()
go
func
()
{
for
{
err
:=
dht
.
runBootstrap
(
ctx
,
cfg
)
if
err
!=
nil
{
log
.
Warningf
(
"error bootstrapping: %s"
,
err
)
}
select
{
case
<-
time
.
After
(
cfg
.
Period
)
:
<-
p
.
Go
(
dht
.
bootstrapWorker
(
cfg
))
.
Closed
()
case
<-
p
.
Closing
()
:
case
<-
ctx
.
Done
()
:
return
}
}
})
}()
return
nil
}
return
proc
,
nil
func
newRandomPeerId
()
peer
.
ID
{
id
:=
make
([]
byte
,
32
)
// SHA256 is the default. TODO: Use a more canonical way to generate random IDs.
rand
.
Read
(
id
)
id
=
u
.
Hash
(
id
)
// TODO: Feed this directly into the multihash instead of hashing it.
return
peer
.
ID
(
id
)
}
func
(
dht
*
IpfsDHT
)
bootstrapWorker
(
cfg
BootstrapConfig
)
func
(
worker
goprocess
.
Process
)
{
return
func
(
worker
goprocess
.
Process
)
{
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?
// Traverse the DHT toward the given ID.
func
(
dht
*
IpfsDHT
)
walk
(
ctx
context
.
Context
,
target
peer
.
ID
)
(
peerstore
.
PeerInfo
,
error
)
{
// TODO: Extract the query action (traversal logic?) inside FindPeer,
// don't actually call through the FindPeer machinery, which can return
// things out of the peer store etc.
return
dht
.
FindPeer
(
ctx
,
target
)
}
ctx
:=
dht
.
Context
()
if
err
:=
dht
.
runBootstrap
(
ctx
,
cfg
);
err
!=
nil
{
log
.
Warning
(
err
)
// A bootstrapping error is important to notice but not fatal.
}
// Traverse the DHT toward a random ID.
func
(
dht
*
IpfsDHT
)
randomWalk
(
ctx
context
.
Context
)
error
{
id
:=
newRandomPeerId
()
p
,
err
:=
dht
.
walk
(
ctx
,
id
)
switch
err
{
case
routing
.
ErrNotFound
:
return
nil
case
nil
:
// We found a peer from a randomly generated ID. This should be very unlikely.
log
.
Warningf
(
"Bootstrap peer error: Actually FOUND peer. (%s, %s)"
,
id
,
p
)
return
nil
default
:
return
err
}
}
// runBootstrap builds up list of peers by requesting random peer IDs
func
(
dht
*
IpfsDHT
)
runBootstrap
(
ctx
context
.
Context
,
cfg
BootstrapConfig
)
error
{
bslog
:=
func
(
msg
string
)
{
log
.
Debugf
(
"DHT %s dhtRunBootstrap %s -- routing table size: %d"
,
dht
.
self
,
msg
,
dht
.
routingTable
.
Size
())
}
...
...
@@ -111,51 +114,24 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
defer
bslog
(
"end"
)
defer
log
.
EventBegin
(
ctx
,
"dhtRunBootstrap"
)
.
Done
()
var
merr
u
.
MultiErr
randomID
:=
func
()
peer
.
ID
{
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id
:=
make
([]
byte
,
16
)
rand
.
Read
(
id
)
id
=
u
.
Hash
(
id
)
return
peer
.
ID
(
id
)
}
// bootstrap sequentially, as results will compound
runQuery
:=
func
(
ctx
context
.
Context
,
id
peer
.
ID
)
{
doQuery
:=
func
(
n
int
,
target
string
,
f
func
(
context
.
Context
)
error
)
error
{
log
.
Debugf
(
"Bootstrapping query (%d/%d) to %s"
,
n
,
cfg
.
Queries
,
target
)
ctx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
cfg
.
Timeout
)
defer
cancel
()
p
,
err
:=
dht
.
FindPeer
(
ctx
,
id
)
if
err
==
routing
.
ErrNotFound
{
// this isn't an error. this is precisely what we expect.
}
else
if
err
!=
nil
{
merr
=
append
(
merr
,
err
)
}
else
{
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err
:=
fmt
.
Errorf
(
"Bootstrap peer error: Actually FOUND peer. (%s, %s)"
,
id
,
p
)
log
.
Warningf
(
"%s"
,
err
)
merr
=
append
(
merr
,
err
)
}
return
f
(
ctx
)
}
//
these should be parallel normally. but can make them sequential for debugging
.
// note that the core/bootstrap context deadline should be extended too for that.
for
i
:=
0
;
i
<
cfg
.
Queries
;
i
++
{
i
d
:=
randomID
()
log
.
Debugf
(
"Bootstrapping query (%d/%d) to random ID: %s"
,
i
+
1
,
cfg
.
Queries
,
id
)
runQuery
(
ctx
,
id
)
//
Do all but one of the bootstrap queries as random walks
.
for
i
:=
1
;
i
<
cfg
.
Queries
;
i
++
{
err
:=
doQuery
(
i
,
"random ID"
,
dht
.
randomWalk
)
i
f
err
!=
nil
{
return
err
}
}
// Find self to distribute peer info to our neighbors.
// Do this after bootstrapping.
log
.
Debugf
(
"Bootstrapping query to self: %s"
,
dht
.
self
)
runQuery
(
ctx
,
dht
.
self
)
if
len
(
merr
)
>
0
{
return
merr
}
return
nil
return
doQuery
(
cfg
.
Queries
,
fmt
.
Sprintf
(
"self: %s"
,
dht
.
self
),
func
(
ctx
context
.
Context
)
error
{
_
,
err
:=
dht
.
walk
(
ctx
,
dht
.
self
)
return
err
})
}
dht_test.go
View file @
f423e38e
...
...
@@ -709,7 +709,7 @@ func TestPeriodicBootstrap(t *testing.T) {
t
.
Logf
(
"bootstrapping them so they find each other. %d"
,
nDHTs
)
for
_
,
dht
:=
range
dhts
{
_,
err := dht.BootstrapWithConfig(cfg)
err
:=
dht
.
BootstrapWithConfig
(
ctx
,
cfg
)
if
err
!=
nil
{
t
.
Fatalf
(
"error bootstrapping a dht: %s"
,
err
)
}
...
...
routing.go
View file @
f423e38e
...
...
@@ -21,6 +21,7 @@ import (
routing
"github.com/libp2p/go-libp2p-routing"
notif
"github.com/libp2p/go-libp2p-routing/notifications"
ropts
"github.com/libp2p/go-libp2p-routing/options"
"github.com/pkg/errors"
)
// asyncQueryBuffer is the size of buffered channels in async queries. This
...
...
@@ -583,7 +584,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo
peers
:=
dht
.
routingTable
.
NearestPeers
(
kb
.
ConvertPeerID
(
id
),
AlphaValue
)
if
len
(
peers
)
==
0
{
return
pstore
.
PeerInfo
{},
kb
.
ErrLookupFailure
return
pstore
.
PeerInfo
{},
errors
.
WithStack
(
kb
.
ErrLookupFailure
)
}
// Sanity...
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment