Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-unixfs
Commits
d6ce837d
Commit
d6ce837d
authored
Jan 20, 2015
by
Juan Batiz-Benet
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
core/bootstrap: cleaned up bootstrapping
Moved it to its own package to isolate scope.
parent
c43f97d6
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
179 additions
and
112 deletions
+179
-112
core/bootstrap.go
core/bootstrap.go
+148
-55
core/core.go
core/core.go
+2
-20
routing/dht/dht_bootstrap.go
routing/dht/dht_bootstrap.go
+29
-37
No files found.
core/bootstrap.go
View file @
d6ce837d
...
...
@@ -2,6 +2,7 @@ package core
import
(
"errors"
"fmt"
"math/rand"
"sync"
"time"
...
...
@@ -16,109 +17,187 @@ import (
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
goprocess
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
periodicproc
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
)
// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap
// peers to bootstrap correctly.
var
ErrNotEnoughBootstrapPeers
=
errors
.
New
(
"not enough bootstrap peers to bootstrap"
)
const
(
period
=
30
*
time
.
Second
// how often to check connection status
connectiontimeout
time
.
Duration
=
period
/
3
// duration to wait when attempting to connect
recoveryThreshold
=
4
// attempt to bootstrap if connection count falls below this value
numDHTBootstrapQueries
=
15
// number of DHT queries to execute
// BootstrapPeriod governs the periodic interval at which the node will
// attempt to bootstrap. The bootstrap process is not very expensive, so
// this threshold can afford to be small (<=30s).
BootstrapPeriod
=
30
*
time
.
Second
// BootstrapPeerThreshold governs the node Bootstrap process. If the node
// has less open connections than this number, it will open connections
// to the bootstrap nodes. From there, the routing system should be able
// to use the connections to the bootstrap nodes to connect to even more
// peers. Routing systems like the IpfsDHT do so in their own Bootstrap
// process, which issues random queries to find more peers.
BootstrapPeerThreshold
=
4
// BootstrapConnectionTimeout determines how long to wait for a bootstrap
// connection attempt before cancelling it.
BootstrapConnectionTimeout
time
.
Duration
=
BootstrapPeriod
/
3
)
func
superviseConnections
(
parent
context
.
Context
,
h
host
.
Host
,
route
*
dht
.
IpfsDHT
,
// TODO depend on abstract interface for testing purposes
store
peer
.
Peerstore
,
peers
[]
peer
.
PeerInfo
)
error
{
// nodeBootstrapper is a small object used to bootstrap an IpfsNode.
type
nodeBootstrapper
struct
{
node
*
IpfsNode
}
var
dhtAlreadyBootstrapping
bool
// TryToBootstrap starts IpfsNode bootstrapping. This function will run an
// initial bootstrapping phase before exiting: connect to several bootstrap
// nodes. This allows callers to call this function synchronously to:
// - check if an error occurrs (bootstrapping unsuccessful)
// - wait before starting services which require the node to be bootstrapped
//
// If bootstrapping initially fails, Bootstrap() will try again for a total of
// three times, before giving up completely. Note that in environments where a
// node may be initialized offline, as normal operation, BootstrapForever()
// should be used instead.
//
// Note: this function could be much cleaner if we were to relax the constraint
// that we want to exit **after** we have performed initial bootstrapping (and are
// thus connected to nodes). The constraint may not be that useful in practice.
// Consider cases when we initialize the node while disconnected from the internet.
// We don't want this launch to fail... want to continue launching the node, hoping
// that bootstrapping will work in the future if we get connected.
func
(
nb
*
nodeBootstrapper
)
TryToBootstrap
(
ctx
context
.
Context
,
peers
[]
peer
.
PeerInfo
)
error
{
n
:=
nb
.
node
// TODO what bootstrapping should happen if there is no DHT? i.e. we could
// continue connecting to our bootstrap peers, but for what purpose? for now
// simply exit without connecting to any of them. When we introduce another
// routing system that uses bootstrap peers we can change this.
dht
,
ok
:=
n
.
Routing
.
(
*
dht
.
IpfsDHT
)
if
!
ok
{
return
nil
}
for
{
ctx
,
_
:=
context
.
WithTimeout
(
parent
,
connectiontimeout
)
// TODO get config from disk so |peers| always reflects the latest
// information
if
err
:=
bootstrap
(
ctx
,
h
,
route
,
store
,
peers
);
err
!=
nil
{
log
.
Error
(
err
)
for
i
:=
0
;
i
<
3
;
i
++
{
if
err
:=
bootstrapRound
(
ctx
,
n
.
PeerHost
,
dht
,
n
.
Peerstore
,
peers
);
err
!=
nil
{
return
err
}
}
if
!
dhtAlreadyBootstrapping
{
dhtAlreadyBootstrapping
=
true
// only call dht.Bootstrap once.
if
_
,
err
:=
route
.
Bootstrap
();
err
!=
nil
{
log
.
Error
(
err
)
}
// at this point we have done at least one round of initial bootstrap.
// we're ready to kick off dht bootstrapping.
dbproc
,
err
:=
dht
.
Bootstrap
(
ctx
)
if
err
!=
nil
{
return
err
}
// kick off the node's periodic bootstrapping
proc
:=
periodicproc
.
Tick
(
BootstrapPeriod
,
func
(
worker
goprocess
.
Process
)
{
if
err
:=
bootstrapRound
(
ctx
,
n
.
PeerHost
,
dht
,
n
.
Peerstore
,
peers
);
err
!=
nil
{
log
.
Error
(
err
)
}
})
// add dht bootstrap proc as a child, so it is closed automatically when we are.
proc
.
AddChild
(
dbproc
)
// we were given a context. instead of returning proc for the caller
// to manage, for now we just close the proc when context is done.
go
func
()
{
<-
ctx
.
Done
()
proc
.
Close
()
}()
return
nil
}
select
{
case
<-
parent
.
Done
()
:
return
parent
.
Err
()
case
<-
time
.
Tick
(
period
)
:
// BootstrapForever starts IpfsNode bootstrapping. Unlike TryToBootstrap(),
// BootstrapForever() will run indefinitely (until its context is cancelled).
// This is particularly useful for the daemon and other services, which may
// be started offline and will come online at a future date.
//
// TODO: check offline --to--> online case works well and doesn't hurt perf.
// We may still be dialing. We should check network config.
func
(
nb
*
nodeBootstrapper
)
BootstrapForever
(
ctx
context
.
Context
,
peers
[]
peer
.
PeerInfo
)
error
{
for
{
if
err
:=
nb
.
TryToBootstrap
(
ctx
,
peers
);
err
==
nil
{
return
nil
}
}
return
nil
}
func
bootstrap
(
ctx
context
.
Context
,
h
host
.
Host
,
r
*
dht
.
IpfsDHT
,
p
s
peer
.
Peerstore
,
func
bootstrap
Round
(
ctx
context
.
Context
,
h
ost
host
.
Host
,
r
oute
*
dht
.
IpfsDHT
,
p
eerstore
peer
.
Peerstore
,
bootstrapPeers
[]
peer
.
PeerInfo
)
error
{
connectedPeers
:=
h
.
Network
()
.
Peers
()
if
len
(
connectedPeers
)
>=
recoveryThreshold
{
log
.
Event
(
ctx
,
"bootstrapSkip"
,
h
.
ID
())
log
.
Debugf
(
"%s core bootstrap skipped -- connected to %d (> %d) nodes"
,
h
.
ID
(),
len
(
connectedPeers
),
recoveryThreshold
)
ctx
,
_
=
context
.
WithTimeout
(
ctx
,
BootstrapConnectionTimeout
)
// determine how many bootstrap connections to open
connectedPeers
:=
host
.
Network
()
.
Peers
()
if
len
(
connectedPeers
)
>=
BootstrapPeerThreshold
{
log
.
Event
(
ctx
,
"bootstrapSkip"
,
host
.
ID
())
log
.
Debugf
(
"%s core bootstrap skipped -- connected to %d (> %d) nodes"
,
host
.
ID
(),
len
(
connectedPeers
),
BootstrapPeerThreshold
)
return
nil
}
numCxnsToCreate
:=
recoveryThreshold
-
len
(
connectedPeers
)
log
.
Event
(
ctx
,
"bootstrapStart"
,
h
.
ID
())
log
.
Debugf
(
"%s core bootstrapping to %d more nodes"
,
h
.
ID
(),
numCxnsToCreate
)
numCxnsToCreate
:=
BootstrapPeerThreshold
-
len
(
connectedPeers
)
// filter out bootstrap nodes we are already connected to
var
notConnected
[]
peer
.
PeerInfo
for
_
,
p
:=
range
bootstrapPeers
{
if
h
.
Network
()
.
Connectedness
(
p
.
ID
)
!=
inet
.
Connected
{
if
h
ost
.
Network
()
.
Connectedness
(
p
.
ID
)
!=
inet
.
Connected
{
notConnected
=
append
(
notConnected
,
p
)
}
}
// if not connected to all bootstrap peer candidates
if
len
(
notConnected
)
>
0
{
var
randomSubset
=
randomSubsetOfPeers
(
notConnected
,
numCxnsToCreate
)
log
.
Debugf
(
"%s bootstrapping to %d nodes: %s"
,
h
.
ID
(),
numCxnsToCreate
,
randomSubset
)
if
err
:=
connect
(
ctx
,
ps
,
r
,
randomSubset
);
err
!=
nil
{
log
.
Event
(
ctx
,
"bootstrapError"
,
h
.
ID
(),
lgbl
.
Error
(
err
))
log
.
Errorf
(
"%s bootstrap error: %s"
,
h
.
ID
(),
err
)
return
err
}
// if connected to all bootstrap peer candidates, exit
if
len
(
notConnected
)
<
1
{
log
.
Debugf
(
"%s no more bootstrap peers to create %d connections"
,
host
.
ID
(),
numCxnsToCreate
)
return
ErrNotEnoughBootstrapPeers
}
// connect to a random susbset of bootstrap candidates
var
randomSubset
=
randomSubsetOfPeers
(
notConnected
,
numCxnsToCreate
)
log
.
Event
(
ctx
,
"bootstrapStart"
,
host
.
ID
())
log
.
Debugf
(
"%s bootstrapping to %d nodes: %s"
,
host
.
ID
(),
numCxnsToCreate
,
randomSubset
)
if
err
:=
bootstrapConnect
(
ctx
,
peerstore
,
route
,
randomSubset
);
err
!=
nil
{
log
.
Event
(
ctx
,
"bootstrapError"
,
host
.
ID
(),
lgbl
.
Error
(
err
))
log
.
Errorf
(
"%s bootstrap error: %s"
,
host
.
ID
(),
err
)
return
err
}
return
nil
}
func
connect
(
ctx
context
.
Context
,
ps
peer
.
Peerstore
,
r
*
dht
.
IpfsDHT
,
peers
[]
peer
.
PeerInfo
)
error
{
func
bootstrapConnect
(
ctx
context
.
Context
,
ps
peer
.
Peerstore
,
route
*
dht
.
IpfsDHT
,
peers
[]
peer
.
PeerInfo
)
error
{
if
len
(
peers
)
<
1
{
return
e
rr
ors
.
New
(
"bootstrap set empty"
)
return
E
rr
NotEnoughBootstrapPeers
}
errs
:=
make
(
chan
error
,
len
(
peers
))
var
wg
sync
.
WaitGroup
for
_
,
p
:=
range
peers
{
// performed asynchronously because when performed synchronously, if
// one `Connect` call hangs, subsequent calls are more likely to
// fail/abort due to an expiring context.
// Also, performed asynchronously for dial speed.
wg
.
Add
(
1
)
go
func
(
p
peer
.
PeerInfo
)
{
defer
wg
.
Done
()
log
.
Event
(
ctx
,
"bootstrapDial"
,
r
.
LocalPeer
(),
p
.
ID
)
log
.
Debugf
(
"%s bootstrapping to %s"
,
r
.
LocalPeer
(),
p
.
ID
)
log
.
Event
(
ctx
,
"bootstrapDial"
,
r
oute
.
LocalPeer
(),
p
.
ID
)
log
.
Debugf
(
"%s bootstrapping to %s"
,
r
oute
.
LocalPeer
(),
p
.
ID
)
ps
.
AddAddresses
(
p
.
ID
,
p
.
Addrs
)
err
:=
r
.
Connect
(
ctx
,
p
.
ID
)
err
:=
r
oute
.
Connect
(
ctx
,
p
.
ID
)
if
err
!=
nil
{
log
.
Event
(
ctx
,
"bootstrapFailed"
,
p
.
ID
)
log
.
Criticalf
(
"failed to bootstrap with %v: %s"
,
p
.
ID
,
err
)
log
.
Errorf
(
"failed to bootstrap with %v: %s"
,
p
.
ID
,
err
)
errs
<-
err
return
}
log
.
Event
(
ctx
,
"bootstrapSuccess"
,
p
.
ID
)
...
...
@@ -126,6 +205,20 @@ func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []pee
}(
p
)
}
wg
.
Wait
()
// our failure condition is when no connection attempt succeeded.
// So drain the errs channel, counting the results.
close
(
errs
)
count
:=
0
var
err
error
for
err
=
range
errs
{
if
err
!=
nil
{
count
++
}
}
if
count
==
len
(
peers
)
{
return
fmt
.
Errorf
(
"failed to bootstrap. %s"
,
err
)
}
return
nil
}
...
...
core/core.go
View file @
d6ce837d
...
...
@@ -297,30 +297,12 @@ func (n *IpfsNode) Resolve(path string) (*merkledag.Node, error) {
func
(
n
*
IpfsNode
)
Bootstrap
(
ctx
context
.
Context
,
peers
[]
peer
.
PeerInfo
)
error
{
// TODO what should return value be when in offlineMode?
if
n
.
Routing
==
nil
{
return
nil
}
// TODO what bootstrapping should happen if there is no DHT? i.e. we could
// continue connecting to our bootstrap peers, but for what purpose?
dhtRouting
,
ok
:=
n
.
Routing
.
(
*
dht
.
IpfsDHT
)
if
!
ok
{
return
nil
}
// TODO consider moving connection supervision into the Network. We've
// discussed improvements to this Node constructor. One improvement
// would be to make the node configurable, allowing clients to inject
// an Exchange, Network, or Routing component and have the constructor
// manage the wiring. In that scenario, this dangling function is a bit
// awkward.
// spin off the node's connection supervisor.
// TODO, clean up how this thing works. Make the superviseConnections thing
// work like the DHT.Bootstrap.
go
superviseConnections
(
ctx
,
n
.
PeerHost
,
dhtRouting
,
n
.
Peerstore
,
peers
)
return
nil
nb
:=
nodeBootstrapper
{
n
}
return
nb
.
TryToBootstrap
(
ctx
,
peers
)
}
func
(
n
*
IpfsNode
)
loadID
()
error
{
...
...
routing/dht/dht_bootstrap.go
View file @
d6ce837d
...
...
@@ -14,6 +14,7 @@ import (
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
goprocess
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
periodicproc
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
)
// DefaultBootstrapQueries specifies how many queries to run,
...
...
@@ -54,9 +55,9 @@ const DefaultBootstrapTimeout = time.Duration(10 * time.Second)
// and connected to at least a few nodes.
//
// Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it.
func
(
dht
*
IpfsDHT
)
Bootstrap
()
(
goprocess
.
Process
,
error
)
{
func
(
dht
*
IpfsDHT
)
Bootstrap
(
ctx
context
.
Context
)
(
goprocess
.
Process
,
error
)
{
if
err
:=
dht
.
runBootstrap
(
dht
.
Context
()
,
DefaultBootstrapQueries
);
err
!=
nil
{
if
err
:=
dht
.
runBootstrap
(
ctx
,
DefaultBootstrapQueries
);
err
!=
nil
{
return
nil
,
err
}
...
...
@@ -79,41 +80,32 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop
return
nil
,
fmt
.
Errorf
(
"invalid signal: %v"
,
signal
)
}
proc
:=
goprocess
.
Go
(
func
(
worker
goprocess
.
Process
)
{
defer
log
.
Debug
(
"dht bootstrapper shutting down"
)
for
{
select
{
case
<-
worker
.
Closing
()
:
return
case
<-
signal
:
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?
ctx
:=
dht
.
Context
()
if
err
:=
dht
.
runBootstrap
(
ctx
,
queries
);
err
!=
nil
{
log
.
Error
(
err
)
// A bootstrapping error is important to notice but not fatal.
// maybe the client should be able to consume these errors,
// though I dont have a clear use case in mind-- what **could**
// the client do if one of the bootstrap calls fails?
//
// This is also related to the core's bootstrap failures.
// superviseConnections should perhaps allow clients to detect
// bootstrapping problems.
//
// Anyway, passing errors could be done with a bootstrapper object.
// this would imply the client should be able to consume a lot of
// other non-fatal dht errors too. providing this functionality
// should be done correctly DHT-wide.
// NB: whatever the design, clients must ensure they drain errors!
// This pattern is common to many things, perhaps long-running services
// should have something like an ErrStream that allows clients to consume
// periodic errors and take action. It should allow the user to also
// ignore all errors with something like an ErrStreamDiscard. We should
// study what other systems do for ideas.
}
}
proc
:=
periodicproc
.
Ticker
(
signal
,
func
(
worker
goprocess
.
Process
)
{
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?
ctx
:=
dht
.
Context
()
if
err
:=
dht
.
runBootstrap
(
ctx
,
queries
);
err
!=
nil
{
log
.
Error
(
err
)
// A bootstrapping error is important to notice but not fatal.
// maybe the client should be able to consume these errors,
// though I dont have a clear use case in mind-- what **could**
// the client do if one of the bootstrap calls fails?
//
// This is also related to the core's bootstrap failures.
// superviseConnections should perhaps allow clients to detect
// bootstrapping problems.
//
// Anyway, passing errors could be done with a bootstrapper object.
// this would imply the client should be able to consume a lot of
// other non-fatal dht errors too. providing this functionality
// should be done correctly DHT-wide.
// NB: whatever the design, clients must ensure they drain errors!
// This pattern is common to many things, perhaps long-running services
// should have something like an ErrStream that allows clients to consume
// periodic errors and take action. It should allow the user to also
// ignore all errors with something like an ErrStreamDiscard. We should
// study what other systems do for ideas.
}
})
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment