Commit 0b052ffd authored by tavit ohanian's avatar tavit ohanian

reference basis

parents e9bcb650 7cf0c1e0
Pipeline #569 failed with stages
in 0 seconds
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
# Automatically merge pull requests opened by web3-bot, as soon as (and only if) all tests pass.
# This reduces the friction associated with updating with our workflows.
on: [ pull_request ]
name: Automerge
jobs:
automerge-check:
if: github.event.pull_request.user.login == 'web3-bot'
runs-on: ubuntu-latest
outputs:
status: ${{ steps.should-automerge.outputs.status }}
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Check if we should automerge
id: should-automerge
run: |
for commit in $(git rev-list --first-parent origin/${{ github.event.pull_request.base.ref }}..${{ github.event.pull_request.head.sha }}); do
committer=$(git show --format=$'%ce' -s $commit)
echo "Committer: $committer"
if [[ "$committer" != "web3-bot@users.noreply.github.com" ]]; then
echo "Commit $commit wasn't committed by web3-bot, but by $committer."
echo "::set-output name=status::false"
exit
fi
done
echo "::set-output name=status::true"
automerge:
needs: automerge-check
runs-on: ubuntu-latest
if: ${{ needs.automerge-check.outputs.status == 'true' }}
steps:
- name: Wait on tests
uses: lewagon/wait-on-check-action@bafe56a6863672c681c3cf671f5e10b20abf2eaa # v0.2
with:
ref: ${{ github.event.pull_request.head.sha }}
repo-token: ${{ secrets.GITHUB_TOKEN }}
wait-interval: 10
running-workflow-name: 'automerge' # the name of this job
- name: Merge PR
uses: pascalgn/automerge-action@741c311a47881be9625932b0a0de1b0937aab1ae # v0.13.1
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
MERGE_LABELS: ""
MERGE_METHOD: "squash"
MERGE_DELETE_BRANCH: true
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
on: [push, pull_request]
name: Go Checks
jobs:
unit:
runs-on: ubuntu-latest
name: All
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions/setup-go@v2
with:
go-version: "1.16.x"
- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@434f5f3816b358fe468fa83dcba62d794e7fe04b # 2021.1 (v0.2.0)
- name: Check that go.mod is tidy
uses: protocol/multiple-go-modules@v1.0
with:
run: |
go mod tidy
if [[ -n $(git ls-files --other --exclude-standard --directory -- go.sum) ]]; then
echo "go.sum was added by go mod tidy"
exit 1
fi
git diff --exit-code -- go.sum go.mod
- name: gofmt
if: ${{ success() || failure() }} # run this step even if the previous one failed
run: |
out=$(gofmt -s -l .)
if [[ -n "$out" ]]; then
echo $out | awk '{print "::error file=" $0 ",line=0,col=0::File is not gofmt-ed."}'
exit 1
fi
- name: go vet
if: ${{ success() || failure() }} # run this step even if the previous one failed
uses: protocol/multiple-go-modules@v1.0
with:
run: go vet ./...
- name: staticcheck
if: ${{ success() || failure() }} # run this step even if the previous one failed
uses: protocol/multiple-go-modules@v1.0
with:
run: |
set -o pipefail
staticcheck ./... | sed -e 's@\(.*\)\.go@./\1.go@g'
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
on: [push, pull_request]
name: Go Test
jobs:
unit:
strategy:
fail-fast: false
matrix:
os: [ "ubuntu", "windows", "macos" ]
go: [ "1.15.x", "1.16.x" ]
runs-on: ${{ matrix.os }}-latest
name: ${{ matrix.os}} (go ${{ matrix.go }})
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go }}
- name: Go information
run: |
go version
go env
- name: Run tests
uses: protocol/multiple-go-modules@v1.0
with:
run: go test -v -coverprofile coverage.txt ./...
- name: Run tests (32 bit)
if: ${{ matrix.os != 'macos' }} # can't run 32 bit tests on OSX.
uses: protocol/multiple-go-modules@v1.0
env:
GOARCH: 386
with:
run: go test -v ./...
- name: Run tests with race detector
if: ${{ matrix.os == 'ubuntu' }} # speed things up. Windows and OSX VMs are slow
uses: protocol/multiple-go-modules@v1.0
with:
run: go test -v -race ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@a1ed4b322b4b38cb846afb5a0ebfa17086917d27 # v1.5.0
with:
file: coverage.txt
env_vars: OS=${{ matrix.os }}, GO=${{ matrix.go }}
The MIT License (MIT)
Copyright (c) 2016 Protocol Labs, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# go-p2p-kad-dht
# go-libp2p-kad-dht
dms3 p2p go-libp2p-kad-dht
\ No newline at end of file
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](https://libp2p.io)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23yellow)
[![GoDoc](https://godoc.org/github.com/libp2p/go-libp2p-kad-dht?status.svg)](https://godoc.org/github.com/libp2p/go-libp2p-kad-dht)
[![Build Status](https://travis-ci.org/libp2p/go-libp2p-kad-dht.svg?branch=master)](https://travis-ci.org/libp2p/go-libp2p-kad-dht)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
> A Kademlia DHT implementation on go-libp2p
## Table of Contents
- [Install](#install)
- [Usage](#usage)
- [Contribute](#contribute)
- [License](#license)
## Install
```sh
go get github.com/libp2p/go-libp2p-kad-dht
```
## Usage
Go to https://godoc.org/github.com/libp2p/go-libp2p-kad-dht.
## Contribute
Contributions welcome. Please check out [the issues](https://github.com/libp2p/go-libp2p-kad-dht/issues).
Check out our [contributing document](https://github.com/libp2p/community/blob/master/CONTRIBUTE.md) for more information on how we work, and about contributing in general. Please be aware that all interactions related to libp2p are subject to the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md).
Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
[MIT](LICENSE) © Protocol Labs Inc.
---
The last gx published version of this module was: 4.4.34: QmXuNFLZc6Nb5akB4sZsxK3doShsFKT1sZFvxLXJvZQwAW
coverage:
range: "50...100"
comment: off
package crawler
import (
"context"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-msgio/protoio"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kbucket "github.com/libp2p/go-libp2p-kbucket"
)
var logger = logging.Logger("dht-crawler")
// Crawler connects to hosts in the DHT to track routing tables of peers.
type Crawler struct {
parallelism int
connectTimeout time.Duration
host host.Host
dhtRPC *pb.ProtocolMessenger
}
// New creates a new Crawler
func New(host host.Host, opts ...Option) (*Crawler, error) {
o := new(options)
if err := defaults(o); err != nil {
return nil, err
}
for _, opt := range opts {
if err := opt(o); err != nil {
return nil, err
}
}
pm, err := pb.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout})
if err != nil {
return nil, err
}
return &Crawler{
parallelism: o.parallelism,
connectTimeout: o.connectTimeout,
host: host,
dhtRPC: pm,
}, nil
}
// MessageSender handles sending wire protocol messages to a given peer
type messageSender struct {
h host.Host
protocols []protocol.ID
timeout time.Duration
}
// SendRequest sends a peer a message and waits for its response
func (ms *messageSender) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
s, err := ms.h.NewStream(ctx, p, ms.protocols...)
if err != nil {
return nil, err
}
w := protoio.NewDelimitedWriter(s)
if err := w.WriteMsg(pmes); err != nil {
return nil, err
}
r := protoio.NewDelimitedReader(s, network.MessageSizeMax)
tctx, cancel := context.WithTimeout(ctx, ms.timeout)
defer cancel()
defer func() { _ = s.Close() }()
msg := new(pb.Message)
if err := ctxReadMsg(tctx, r, msg); err != nil {
_ = s.Reset()
return nil, err
}
return msg, nil
}
func ctxReadMsg(ctx context.Context, rc protoio.ReadCloser, mes *pb.Message) error {
errc := make(chan error, 1)
go func(r protoio.ReadCloser) {
defer close(errc)
err := r.ReadMsg(mes)
errc <- err
}(rc)
select {
case err := <-errc:
return err
case <-ctx.Done():
return ctx.Err()
}
}
// SendMessage sends a peer a message without waiting on a response
func (ms *messageSender) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
s, err := ms.h.NewStream(ctx, p, ms.protocols...)
if err != nil {
return err
}
defer func() { _ = s.Close() }()
w := protoio.NewDelimitedWriter(s)
return w.WriteMsg(pmes)
}
// HandleQueryResult is a callback on successful peer query
type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)
// HandleQueryFail is a callback on failed peer query
type HandleQueryFail func(p peer.ID, err error)
const dialAddressExtendDur time.Duration = time.Minute * 30
// Run crawls dht peers from an initial seed of `startingPeers`
func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
jobs := make(chan peer.ID, 1)
results := make(chan *queryResult, 1)
// Start worker goroutines
var wg sync.WaitGroup
wg.Add(c.parallelism)
for i := 0; i < c.parallelism; i++ {
go func() {
defer wg.Done()
for p := range jobs {
res := c.queryPeer(ctx, p)
results <- res
}
}()
}
defer wg.Wait()
defer close(jobs)
var toDial []*peer.AddrInfo
peersSeen := make(map[peer.ID]struct{})
numSkipped := 0
for _, ai := range startingPeers {
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
if len(ai.Addrs) > 0 {
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, dialAddressExtendDur)
}
if len(extendAddrs) == 0 {
numSkipped++
continue
}
toDial = append(toDial, ai)
peersSeen[ai.ID] = struct{}{}
}
if numSkipped > 0 {
logger.Infof("%d starting peers were skipped due to lack of addresses. Starting crawl with %d peers", numSkipped, len(toDial))
}
numQueried := 0
outstanding := 0
for len(toDial) > 0 || outstanding > 0 {
var jobCh chan peer.ID
var nextPeerID peer.ID
if len(toDial) > 0 {
jobCh = jobs
nextPeerID = toDial[0].ID
}
select {
case res := <-results:
if len(res.data) > 0 {
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
for p, ai := range res.data {
c.host.Peerstore().AddAddrs(p, ai.Addrs, dialAddressExtendDur)
if _, ok := peersSeen[p]; !ok {
peersSeen[p] = struct{}{}
toDial = append(toDial, ai)
}
rtPeers = append(rtPeers, ai)
}
if handleSuccess != nil {
handleSuccess(res.peer, rtPeers)
}
} else if handleFail != nil {
handleFail(res.peer, res.err)
}
outstanding--
case jobCh <- nextPeerID:
outstanding++
numQueried++
toDial = toDial[1:]
logger.Debugf("starting %d out of %d", numQueried, len(peersSeen))
}
}
}
type queryResult struct {
peer peer.ID
data map[peer.ID]*peer.AddrInfo
err error
}
func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult {
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil)
if err != nil {
logger.Errorf("error creating rt for peer %v : %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}
connCtx, cancel := context.WithTimeout(ctx, c.connectTimeout)
defer cancel()
err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer})
if err != nil {
logger.Debugf("could not connect to peer %v: %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}
localPeers := make(map[peer.ID]*peer.AddrInfo)
var retErr error
for cpl := 0; cpl <= 15; cpl++ {
generatePeer, err := tmpRT.GenRandPeerID(uint(cpl))
if err != nil {
panic(err)
}
peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer)
if err != nil {
logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err)
retErr = err
break
}
for _, ai := range peers {
if _, ok := localPeers[ai.ID]; !ok {
localPeers[ai.ID] = ai
}
}
}
if retErr != nil {
return &queryResult{nextPeer, nil, retErr}
}
return &queryResult{nextPeer, localPeers, retErr}
}
package crawler
import (
"time"
"github.com/libp2p/go-libp2p-core/protocol"
)
// Option DHT Crawler option type.
type Option func(*options) error
type options struct {
protocols []protocol.ID
parallelism int
connectTimeout time.Duration
perMsgTimeout time.Duration
}
// defaults are the default crawler options. This option will be automatically
// prepended to any options you pass to the crawler constructor.
var defaults = func(o *options) error {
o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"}
o.parallelism = 1000
o.connectTimeout = time.Second * 5
o.perMsgTimeout = time.Second * 5
return nil
}
// WithProtocols defines the ordered set of protocols the crawler will use to talk to other nodes
func WithProtocols(protocols []protocol.ID) Option {
return func(o *options) error {
o.protocols = append([]protocol.ID{}, protocols...)
return nil
}
}
// WithParallelism defines the number of queries that can be issued in parallel
func WithParallelism(parallelism int) Option {
return func(o *options) error {
o.parallelism = parallelism
return nil
}
}
// WithMsgTimeout defines the amount of time a single DHT message is allowed to take before it's deemed failed
func WithMsgTimeout(timeout time.Duration) Option {
return func(o *options) error {
o.perMsgTimeout = timeout
return nil
}
}
// WithConnectTimeout defines the time for peer connection before timing out
func WithConnectTimeout(timeout time.Duration) Option {
return func(o *options) error {
o.connectTimeout = timeout
return nil
}
}
This diff is collapsed.
package dht
import (
"context"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
)
// DefaultBootstrapPeers is a set of public DHT bootstrap peers provided by libp2p.
var DefaultBootstrapPeers []multiaddr.Multiaddr
// Minimum number of peers in the routing table. If we drop below this and we
// see a new peer, we trigger a bootstrap round.
var minRTRefreshThreshold = 10
const (
periodicBootstrapInterval = 2 * time.Minute
maxNBoostrappers = 2
)
func init() {
for _, s := range []string{
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
} {
ma, err := multiaddr.NewMultiaddr(s)
if err != nil {
panic(err)
}
DefaultBootstrapPeers = append(DefaultBootstrapPeers, ma)
}
}
// GetDefaultBootstrapPeerAddrInfos returns the peer.AddrInfos for the default
// bootstrap peers so we can use these for initializing the DHT by passing these to the
// BootstrapPeers(...) option.
func GetDefaultBootstrapPeerAddrInfos() []peer.AddrInfo {
ds := make([]peer.AddrInfo, 0, len(DefaultBootstrapPeers))
for i := range DefaultBootstrapPeers {
info, err := peer.AddrInfoFromP2pAddr(DefaultBootstrapPeers[i])
if err != nil {
logger.Errorw("failed to convert bootstrapper address to peer addr info", "address",
DefaultBootstrapPeers[i].String(), err, "err")
continue
}
ds = append(ds, *info)
}
return ds
}
// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
dht.fixRTIfNeeded()
dht.rtRefreshManager.RefreshNoWait()
return nil
}
// RefreshRoutingTable tells the DHT to refresh it's routing tables.
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
return dht.rtRefreshManager.Refresh(false)
}
// ForceRefresh acts like RefreshRoutingTable but forces the DHT to refresh all
// buckets in the Routing Table irrespective of when they were last refreshed.
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) ForceRefresh() <-chan error {
return dht.rtRefreshManager.Refresh(true)
}
package dht
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/peer"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/stretchr/testify/require"
)
func TestSelfWalkOnAddressChange(t *testing.T) {
ctx := context.Background()
// create three DHT instances with auto refresh disabled
d1 := setupDHT(ctx, t, false, DisableAutoRefresh(), forceAddressUpdateProcessing(t))
d2 := setupDHT(ctx, t, false, DisableAutoRefresh())
d3 := setupDHT(ctx, t, false, DisableAutoRefresh())
var connectedTo *IpfsDHT
// connect d1 to whoever is "further"
if kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d2.self)) <=
kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d3.self)) {
connect(t, ctx, d1, d3)
connectedTo = d3
} else {
connect(t, ctx, d1, d2)
connectedTo = d2
}
// then connect d2 AND d3
connect(t, ctx, d2, d3)
// d1 should have ONLY 1 peer in it's RT
waitForWellFormedTables(t, []*IpfsDHT{d1}, 1, 1, 2*time.Second)
require.Equal(t, connectedTo.self, d1.routingTable.ListPeers()[0])
// now emit the address change event
em, err := d1.host.EventBus().Emitter(&event.EvtLocalAddressesUpdated{})
require.NoError(t, err)
require.NoError(t, em.Emit(event.EvtLocalAddressesUpdated{}))
waitForWellFormedTables(t, []*IpfsDHT{d1}, 2, 2, 2*time.Second)
// it should now have both peers in the RT
ps := d1.routingTable.ListPeers()
require.Contains(t, ps, d2.self)
require.Contains(t, ps, d3.self)
}
func TestDefaultBootstrappers(t *testing.T) {
ds := GetDefaultBootstrapPeerAddrInfos()
require.NotEmpty(t, ds)
require.Len(t, ds, len(DefaultBootstrapPeers))
dfmap := make(map[peer.ID]peer.AddrInfo)
for _, p := range DefaultBootstrapPeers {
info, err := peer.AddrInfoFromP2pAddr(p)
require.NoError(t, err)
dfmap[info.ID] = *info
}
for _, p := range ds {
inf, ok := dfmap[p.ID]
require.True(t, ok)
require.ElementsMatch(t, p.Addrs, inf.Addrs)
delete(dfmap, p.ID)
}
require.Empty(t, dfmap)
}
func TestBootstrappersReplacable(t *testing.T) {
old := rtFreezeTimeout
rtFreezeTimeout = 100 * time.Millisecond
defer func() {
rtFreezeTimeout = old
}()
ctx := context.Background()
d := setupDHT(ctx, t, false, disableFixLowPeersRoutine(t), BucketSize(2))
defer d.host.Close()
defer d.Close()
var d1 *IpfsDHT
var d2 *IpfsDHT
// d1 & d2 have a cpl of 0
for {
d1 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d1.selfKey) == 0 {
break
}
}
for {
d2 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d2.selfKey) == 0 {
break
}
}
defer d1.host.Close()
defer d1.Close()
defer d2.host.Close()
defer d2.Close()
connect(t, ctx, d, d1)
connect(t, ctx, d, d2)
require.Len(t, d.routingTable.ListPeers(), 2)
// d3 & d4 with cpl=0 will go in as d1 & d2 are replacable.
var d3 *IpfsDHT
var d4 *IpfsDHT
for {
d3 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d3.selfKey) == 0 {
break
}
}
for {
d4 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d4.selfKey) == 0 {
break
}
}
defer d3.host.Close()
defer d3.Close()
defer d4.host.Close()
defer d4.Close()
connect(t, ctx, d, d3)
connect(t, ctx, d, d4)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)
// do couple of refreshes and wait for the Routing Table to be "frozen".
<-d.RefreshRoutingTable()
<-d.RefreshRoutingTable()
time.Sleep(1 * time.Second)
// adding d5 fails because RT is frozen
var d5 *IpfsDHT
for {
d5 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t))
if kb.CommonPrefixLen(d.selfKey, d5.selfKey) == 0 {
break
}
}
defer d5.host.Close()
defer d5.Close()
connectNoSync(t, ctx, d, d5)
time.Sleep(500 * time.Millisecond)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)
// Let's empty the routing table
for _, p := range d.routingTable.ListPeers() {
d.routingTable.RemovePeer(p)
}
require.Len(t, d.routingTable.ListPeers(), 0)
// adding d1 & d2 works now because there is space in the Routing Table
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d2.self))
connect(t, ctx, d, d1)
connect(t, ctx, d, d2)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d1.self)
require.Contains(t, d.routingTable.ListPeers(), d2.self)
// adding d3 & d4 also works because the RT is not frozen.
require.NoError(t, d.host.Network().ClosePeer(d3.self))
require.NoError(t, d.host.Network().ClosePeer(d4.self))
connect(t, ctx, d, d3)
connect(t, ctx, d, d4)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)
// run refreshes and freeze the RT
<-d.RefreshRoutingTable()
<-d.RefreshRoutingTable()
time.Sleep(1 * time.Second)
// cant add d1 & d5 because RT is frozen.
require.NoError(t, d.host.Network().ClosePeer(d1.self))
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self, true)
d.peerFound(ctx, d1.self, true)
time.Sleep(1 * time.Second)
require.Len(t, d.routingTable.ListPeers(), 2)
require.Contains(t, d.routingTable.ListPeers(), d3.self)
require.Contains(t, d.routingTable.ListPeers(), d4.self)
}
package dht
import (
"bytes"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/google/gopacket/routing"
netroute "github.com/libp2p/go-netroute"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
)
// QueryFilterFunc is a filter applied when considering peers to dial when querying
type QueryFilterFunc = dhtcfg.QueryFilterFunc
// RouteTableFilterFunc is a filter applied when considering connections to keep in
// the local route table.
type RouteTableFilterFunc = dhtcfg.RouteTableFilterFunc
var publicCIDR6 = "2000::/3"
var public6 *net.IPNet
func init() {
_, public6, _ = net.ParseCIDR(publicCIDR6)
}
// isPublicAddr follows the logic of manet.IsPublicAddr, except it uses
// a stricter definition of "public" for ipv6: namely "is it in 2000::/3"?
func isPublicAddr(a ma.Multiaddr) bool {
ip, err := manet.ToIP(a)
if err != nil {
return false
}
if ip.To4() != nil {
return !inAddrRange(ip, manet.Private4) && !inAddrRange(ip, manet.Unroutable4)
}
return public6.Contains(ip)
}
// isPrivateAddr follows the logic of manet.IsPrivateAddr, except that
// it uses a stricter definition of "public" for ipv6
func isPrivateAddr(a ma.Multiaddr) bool {
ip, err := manet.ToIP(a)
if err != nil {
return false
}
if ip.To4() != nil {
return inAddrRange(ip, manet.Private4)
}
return !public6.Contains(ip) && !inAddrRange(ip, manet.Unroutable6)
}
// PublicQueryFilter returns true if the peer is suspected of being publicly accessible
func PublicQueryFilter(_ interface{}, ai peer.AddrInfo) bool {
if len(ai.Addrs) == 0 {
return false
}
var hasPublicAddr bool
for _, a := range ai.Addrs {
if !isRelayAddr(a) && isPublicAddr(a) {
hasPublicAddr = true
}
}
return hasPublicAddr
}
type hasHost interface {
Host() host.Host
}
var _ QueryFilterFunc = PublicQueryFilter
// PublicRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a public network
func PublicRoutingTableFilter(dht interface{}, p peer.ID) bool {
d := dht.(hasHost)
conns := d.Host().Network().ConnsToPeer(p)
if len(conns) == 0 {
return false
}
// Do we have a public address for this peer?
id := conns[0].RemotePeer()
known := d.Host().Peerstore().PeerInfo(id)
for _, a := range known.Addrs {
if !isRelayAddr(a) && isPublicAddr(a) {
return true
}
}
return false
}
var _ RouteTableFilterFunc = PublicRoutingTableFilter
// PrivateQueryFilter doens't currently restrict which peers we are willing to query from the local DHT.
func PrivateQueryFilter(_ interface{}, ai peer.AddrInfo) bool {
return len(ai.Addrs) > 0
}
var _ QueryFilterFunc = PrivateQueryFilter
// We call this very frequently but routes can technically change at runtime.
// Cache it for two minutes.
const routerCacheTime = 2 * time.Minute
var routerCache struct {
sync.RWMutex
router routing.Router
expires time.Time
}
func getCachedRouter() routing.Router {
routerCache.RLock()
router := routerCache.router
expires := routerCache.expires
routerCache.RUnlock()
if time.Now().Before(expires) {
return router
}
routerCache.Lock()
defer routerCache.Unlock()
now := time.Now()
if now.Before(routerCache.expires) {
return router
}
routerCache.router, _ = netroute.New()
routerCache.expires = now.Add(routerCacheTime)
return router
}
// PrivateRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a private network
func PrivateRoutingTableFilter(dht interface{}, p peer.ID) bool {
d := dht.(hasHost)
conns := d.Host().Network().ConnsToPeer(p)
return privRTFilter(d, conns)
}
func privRTFilter(dht interface{}, conns []network.Conn) bool {
d := dht.(hasHost)
h := d.Host()
router := getCachedRouter()
myAdvertisedIPs := make([]net.IP, 0)
for _, a := range h.Addrs() {
if isPublicAddr(a) && !isRelayAddr(a) {
ip, err := manet.ToIP(a)
if err != nil {
continue
}
myAdvertisedIPs = append(myAdvertisedIPs, ip)
}
}
for _, c := range conns {
ra := c.RemoteMultiaddr()
if isPrivateAddr(ra) && !isRelayAddr(ra) {
return true
}
if isPublicAddr(ra) {
ip, err := manet.ToIP(ra)
if err != nil {
continue
}
// if the ip is the same as one of the local host's public advertised IPs - then consider it local
for _, i := range myAdvertisedIPs {
if i.Equal(ip) {
return true
}
if ip.To4() == nil {
if i.To4() == nil && isEUI(ip) && sameV6Net(i, ip) {
return true
}
}
}
// if there's no gateway - a direct host in the OS routing table - then consider it local
// This is relevant in particular to ipv6 networks where the addresses may all be public,
// but the nodes are aware of direct links between each other.
if router != nil {
_, gw, _, err := router.Route(ip)
if gw == nil && err == nil {
return true
}
}
}
}
return false
}
var _ RouteTableFilterFunc = PrivateRoutingTableFilter
func isEUI(ip net.IP) bool {
// per rfc 2373
return len(ip) == net.IPv6len && ip[11] == 0xff && ip[12] == 0xfe
}
func sameV6Net(a, b net.IP) bool {
//lint:ignore SA1021 We're comparing only parts of the IP address here.
return len(a) == net.IPv6len && len(b) == net.IPv6len && bytes.Equal(a[0:8], b[0:8]) //nolint
}
func isRelayAddr(a ma.Multiaddr) bool {
found := false
ma.ForEach(a, func(c ma.Component) bool {
found = c.Protocol().Code == ma.P_CIRCUIT
return !found
})
return found
}
func inAddrRange(ip net.IP, ipnets []*net.IPNet) bool {
for _, ipnet := range ipnets {
if ipnet.Contains(ip) {
return true
}
}
return false
}
package dht
import (
"context"
"net"
"testing"
ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
func TestIsRelay(t *testing.T) {
a, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5002/p2p/QmdPU7PfRyKehdrP5A3WqmjyD6bhVpU1mLGKppa2FjGDjZ/p2p-circuit/p2p/QmVT6GYwjeeAF5TR485Yc58S3xRF5EFsZ5YAF4VcP3URHt")
if !isRelayAddr(a) {
t.Fatalf("thought %s was not a relay", a)
}
a, _ = ma.NewMultiaddr("/p2p-circuit/p2p/QmVT6GYwjeeAF5TR485Yc58S3xRF5EFsZ5YAF4VcP3URHt")
if !isRelayAddr(a) {
t.Fatalf("thought %s was not a relay", a)
}
a, _ = ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5002/p2p/QmdPU7PfRyKehdrP5A3WqmjyD6bhVpU1mLGKppa2FjGDjZ")
if isRelayAddr(a) {
t.Fatalf("thought %s was a relay", a)
}
}
type mockConn struct {
local peer.AddrInfo
remote peer.AddrInfo
}
var _ network.Conn = (*mockConn)(nil)
func (m *mockConn) ID() string { return "0" }
func (m *mockConn) Close() error { return nil }
func (m *mockConn) NewStream(context.Context) (network.Stream, error) { return nil, nil }
func (m *mockConn) GetStreams() []network.Stream { return []network.Stream{} }
func (m *mockConn) Stat() network.Stat { return network.Stat{Direction: network.DirOutbound} }
func (m *mockConn) LocalMultiaddr() ma.Multiaddr { return m.local.Addrs[0] }
func (m *mockConn) RemoteMultiaddr() ma.Multiaddr { return m.remote.Addrs[0] }
func (m *mockConn) LocalPeer() peer.ID { return m.local.ID }
func (m *mockConn) LocalPrivateKey() ic.PrivKey { return nil }
func (m *mockConn) RemotePeer() peer.ID { return m.remote.ID }
func (m *mockConn) RemotePublicKey() ic.PubKey { return nil }
func TestFilterCaching(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := setupDHT(ctx, t, true)
remote, _ := manet.FromIP(net.IPv4(8, 8, 8, 8))
if privRTFilter(d, []network.Conn{&mockConn{
local: d.Host().Peerstore().PeerInfo(d.Host().ID()),
remote: peer.AddrInfo{ID: "", Addrs: []ma.Multiaddr{remote}},
}}) {
t.Fatal("filter should prevent public remote peers.")
}
r1 := getCachedRouter()
r2 := getCachedRouter()
if r1 != r2 {
t.Fatal("router should be returned multiple times.")
}
}
package dht
import (
"io"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-kad-dht/internal/net"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-msgio"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
)
var dhtStreamIdleTimeout = 1 * time.Minute
// ErrReadTimeout is an error that occurs when no message is read within the timeout period.
var ErrReadTimeout = net.ErrReadTimeout
// handleNewStream implements the network.StreamHandler
func (dht *IpfsDHT) handleNewStream(s network.Stream) {
if dht.handleNewMessage(s) {
// If we exited without error, close gracefully.
_ = s.Close()
} else {
// otherwise, send an error.
_ = s.Reset()
}
}
// Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
ctx := dht.ctx
r := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
mPeer := s.Conn().RemotePeer()
timer := time.AfterFunc(dhtStreamIdleTimeout, func() { _ = s.Reset() })
defer timer.Stop()
for {
if dht.getMode() != modeServer {
logger.Errorf("ignoring incoming dht message while not in server mode")
return false
}
var req pb.Message
msgbytes, err := r.ReadMsg()
msgLen := len(msgbytes)
if err != nil {
r.ReleaseMsg(msgbytes)
if err == io.EOF {
return true
}
// This string test is necessary because there isn't a single stream reset error
// instance in use.
if c := baseLogger.Check(zap.DebugLevel, "error reading message"); c != nil && err.Error() != "stream reset" {
c.Write(zap.String("from", mPeer.String()),
zap.Error(err))
}
if msgLen > 0 {
_ = stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessages.M(1),
metrics.ReceivedMessageErrors.M(1),
metrics.ReceivedBytes.M(int64(msgLen)),
)
}
return false
}
err = req.Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
if c := baseLogger.Check(zap.DebugLevel, "error unmarshaling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Error(err))
}
_ = stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessages.M(1),
metrics.ReceivedMessageErrors.M(1),
metrics.ReceivedBytes.M(int64(msgLen)),
)
return false
}
timer.Reset(dhtStreamIdleTimeout)
startTime := time.Now()
ctx, _ := tag.New(ctx,
tag.Upsert(metrics.KeyMessageType, req.GetType().String()),
)
stats.Record(ctx,
metrics.ReceivedMessages.M(1),
metrics.ReceivedBytes.M(int64(msgLen)),
)
handler := dht.handlerForMsgType(req.GetType())
if handler == nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
if c := baseLogger.Check(zap.DebugLevel, "can't handle received message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())))
}
return false
}
// a peer has queried us, let's add it to RT
dht.peerFound(dht.ctx, mPeer, true)
if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
zap.Binary("key", req.GetKey()))
}
resp, err := handler(ctx, mPeer, &req)
if err != nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
if c := baseLogger.Check(zap.DebugLevel, "error handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
zap.Binary("key", req.GetKey()),
zap.Error(err))
}
return false
}
if c := baseLogger.Check(zap.DebugLevel, "handled message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
zap.Binary("key", req.GetKey()),
zap.Duration("time", time.Since(startTime)))
}
if resp == nil {
continue
}
// send out response msg
err = net.WriteMsg(s, resp)
if err != nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
if c := baseLogger.Check(zap.DebugLevel, "error writing response"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
zap.Binary("key", req.GetKey()),
zap.Error(err))
}
return false
}
elapsedTime := time.Since(startTime)
if c := baseLogger.Check(zap.DebugLevel, "responded to message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
zap.Binary("key", req.GetKey()),
zap.Duration("time", elapsedTime))
}
latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
stats.Record(ctx, metrics.InboundRequestLatency.M(latencyMillis))
}
}
package dht
import (
"fmt"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
record "github.com/libp2p/go-libp2p-record"
ds "github.com/ipfs/go-datastore"
)
// ModeOpt describes what mode the dht should operate in
type ModeOpt = dhtcfg.ModeOpt
const (
// ModeAuto utilizes EvtLocalReachabilityChanged events sent over the event bus to dynamically switch the DHT
// between Client and Server modes based on network conditions
ModeAuto ModeOpt = iota
// ModeClient operates the DHT as a client only, it cannot respond to incoming queries
ModeClient
// ModeServer operates the DHT as a server, it can both send and respond to queries
ModeServer
// ModeAutoServer operates in the same way as ModeAuto, but acts as a server when reachability is unknown
ModeAutoServer
)
// DefaultPrefix is the application specific prefix attached to all DHT protocols by default.
const DefaultPrefix protocol.ID = "/ipfs"
type Option = dhtcfg.Option
// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
return func(c *dhtcfg.Config) error {
c.RoutingTable.LatencyTolerance = latency
return nil
}
}
// RoutingTableRefreshQueryTimeout sets the timeout for routing table refresh
// queries.
func RoutingTableRefreshQueryTimeout(timeout time.Duration) Option {
return func(c *dhtcfg.Config) error {
c.RoutingTable.RefreshQueryTimeout = timeout
return nil
}
}
// RoutingTableRefreshPeriod sets the period for refreshing buckets in the
// routing table. The DHT will refresh buckets every period by:
//
// 1. First searching for nearby peers to figure out how many buckets we should try to fill.
// 1. Then searching for a random key in each bucket that hasn't been queried in
// the last refresh period.
func RoutingTableRefreshPeriod(period time.Duration) Option {
return func(c *dhtcfg.Config) error {
c.RoutingTable.RefreshInterval = period
return nil
}
}
// Datastore configures the DHT to use the specified datastore.
//
// Defaults to an in-memory (temporary) map.
func Datastore(ds ds.Batching) Option {
return func(c *dhtcfg.Config) error {
c.Datastore = ds
return nil
}
}
// Mode configures which mode the DHT operates in (Client, Server, Auto).
//
// Defaults to ModeAuto.
func Mode(m ModeOpt) Option {
return func(c *dhtcfg.Config) error {
c.Mode = m
return nil
}
}
// Validator configures the DHT to use the specified validator.
//
// Defaults to a namespaced validator that can validate both public key (under the "pk"
// namespace) and IPNS records (under the "ipns" namespace). Setting the validator
// implies that the user wants to control the validators and therefore the default
// public key and IPNS validators will not be added.
func Validator(v record.Validator) Option {
return func(c *dhtcfg.Config) error {
c.Validator = v
c.ValidatorChanged = true
return nil
}
}
// NamespacedValidator adds a validator namespaced under `ns`. This option fails
// if the DHT is not using a `record.NamespacedValidator` as its validator (it
// uses one by default but this can be overridden with the `Validator` option).
// Adding a namespaced validator without changing the `Validator` will result in
// adding a new validator in addition to the default public key and IPNS validators.
// The "pk" and "ipns" namespaces cannot be overridden here unless a new `Validator`
// has been set first.
//
// Example: Given a validator registered as `NamespacedValidator("ipns",
// myValidator)`, all records with keys starting with `/ipns/` will be validated
// with `myValidator`.
func NamespacedValidator(ns string, v record.Validator) Option {
return func(c *dhtcfg.Config) error {
nsval, ok := c.Validator.(record.NamespacedValidator)
if !ok {
return fmt.Errorf("can only add namespaced validators to a NamespacedValidator")
}
nsval[ns] = v
return nil
}
}
// ProtocolPrefix sets an application specific prefix to be attached to all DHT protocols. For example,
// /myapp/kad/1.0.0 instead of /ipfs/kad/1.0.0. Prefix should be of the form /myapp.
//
// Defaults to dht.DefaultPrefix
func ProtocolPrefix(prefix protocol.ID) Option {
return func(c *dhtcfg.Config) error {
c.ProtocolPrefix = prefix
return nil
}
}
// ProtocolExtension adds an application specific protocol to the DHT protocol. For example,
// /ipfs/lan/kad/1.0.0 instead of /ipfs/kad/1.0.0. extension should be of the form /lan.
func ProtocolExtension(ext protocol.ID) Option {
return func(c *dhtcfg.Config) error {
c.ProtocolPrefix += ext
return nil
}
}
// V1ProtocolOverride overrides the protocolID used for /kad/1.0.0 with another. This is an
// advanced feature, and should only be used to handle legacy networks that have not been
// using protocolIDs of the form /app/kad/1.0.0.
//
// This option will override and ignore the ProtocolPrefix and ProtocolExtension options
func V1ProtocolOverride(proto protocol.ID) Option {
return func(c *dhtcfg.Config) error {
c.V1ProtocolOverride = proto
return nil
}
}
// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
//
// The default value is 20.
func BucketSize(bucketSize int) Option {
return func(c *dhtcfg.Config) error {
c.BucketSize = bucketSize
return nil
}
}
// Concurrency configures the number of concurrent requests (alpha in the Kademlia paper) for a given query path.
//
// The default value is 10.
func Concurrency(alpha int) Option {
return func(c *dhtcfg.Config) error {
c.Concurrency = alpha
return nil
}
}
// Resiliency configures the number of peers closest to a target that must have responded in order for a given query
// path to complete.
//
// The default value is 3.
func Resiliency(beta int) Option {
return func(c *dhtcfg.Config) error {
c.Resiliency = beta
return nil
}
}
// MaxRecordAge specifies the maximum time that any node will hold onto a record ("PutValue record")
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
func MaxRecordAge(maxAge time.Duration) Option {
return func(c *dhtcfg.Config) error {
c.MaxRecordAge = maxAge
return nil
}
}
// DisableAutoRefresh completely disables 'auto-refresh' on the DHT routing
// table. This means that we will neither refresh the routing table periodically
// nor when the routing table size goes below the minimum threshold.
func DisableAutoRefresh() Option {
return func(c *dhtcfg.Config) error {
c.RoutingTable.AutoRefresh = false
return nil
}
}
// DisableProviders disables storing and retrieving provider records.
//
// Defaults to enabled.
//
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
// network and/or distinct DHT protocols with the `Protocols` option).
func DisableProviders() Option {
return func(c *dhtcfg.Config) error {
c.EnableProviders = false
return nil
}
}
// DisableValues disables storing and retrieving value records (including
// public keys).
//
// Defaults to enabled.
//
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
// network and/or distinct DHT protocols with the `Protocols` option).
func DisableValues() Option {
return func(c *dhtcfg.Config) error {
c.EnableValues = false
return nil
}
}
// ProvidersOptions are options passed directly to the provider manager.
//
// The provider manager adds and gets provider records from the datastore, cahing
// them in between. These options are passed to the provider manager allowing
// customisation of things like the GC interval and cache implementation.
func ProvidersOptions(opts []providers.Option) Option {
return func(c *dhtcfg.Config) error {
c.ProvidersOptions = opts
return nil
}
}
// QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option {
return func(c *dhtcfg.Config) error {
c.QueryPeerFilter = filter
return nil
}
}
// RoutingTableFilter sets a function that approves which peers may be added to the routing table. The host should
// already have at least one connection to the peer under consideration.
func RoutingTableFilter(filter RouteTableFilterFunc) Option {
return func(c *dhtcfg.Config) error {
c.RoutingTable.PeerFilter = filter
return nil
}
}
// BootstrapPeers configures the bootstrapping nodes that we will connect to to seed
// and refresh our Routing Table if it becomes empty.
func BootstrapPeers(bootstrappers ...peer.AddrInfo) Option {
return func(c *dhtcfg.Config) error {
c.BootstrapPeers = func() []peer.AddrInfo {
return bootstrappers
}
return nil
}
}
// BootstrapPeersFunc configures the function that returns the bootstrapping nodes that we will
// connect to to seed and refresh our Routing Table if it becomes empty.
func BootstrapPeersFunc(getBootstrapPeers func() []peer.AddrInfo) Option {
return func(c *dhtcfg.Config) error {
c.BootstrapPeers = getBootstrapPeers
return nil
}
}
// RoutingTablePeerDiversityFilter configures the implementation of the `PeerIPGroupFilter` that will be used
// to construct the diversity filter for the Routing Table.
// Please see the docs for `peerdiversity.PeerIPGroupFilter` AND `peerdiversity.Filter` for more details.
func RoutingTablePeerDiversityFilter(pg peerdiversity.PeerIPGroupFilter) Option {
return func(c *dhtcfg.Config) error {
c.RoutingTable.DiversityFilter = pg
return nil
}
}
// disableFixLowPeersRoutine disables the "fixLowPeers" routine in the DHT.
// This is ONLY for tests.
func disableFixLowPeersRoutine(t *testing.T) Option {
return func(c *dhtcfg.Config) error {
c.DisableFixLowPeers = true
return nil
}
}
// forceAddressUpdateProcessing forces the DHT to handle changes to the hosts addresses.
// This occurs even when AutoRefresh has been disabled.
// This is ONLY for tests.
func forceAddressUpdateProcessing(t *testing.T) Option {
return func(c *dhtcfg.Config) error {
c.TestAddressUpdateProcessing = true
return nil
}
}
This diff is collapsed.
// Package dht implements a distributed hash table that satisfies the ipfs routing
// interface. This DHT is modeled after kademlia with S/Kademlia modifications.
package dht
// Package dual provides an implementaiton of a split or "dual" dht, where two parallel instances
// are maintained for the global internet and the local LAN respectively.
package dual
import (
"context"
"fmt"
"sync"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/ipfs/go-cid"
ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
helper "github.com/libp2p/go-libp2p-routing-helpers"
ma "github.com/multiformats/go-multiaddr"
"github.com/hashicorp/go-multierror"
)
// DHT implements the routing interface to provide two concrete DHT implementationts for use
// in IPFS that are used to support both global network users and disjoint LAN usecases.
type DHT struct {
WAN *dht.IpfsDHT
LAN *dht.IpfsDHT
}
// LanExtension is used to differentiate local protocol requests from those on the WAN DHT.
const LanExtension protocol.ID = "/lan"
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*DHT)(nil)
_ routing.Routing = (*DHT)(nil)
_ routing.PeerRouting = (*DHT)(nil)
_ routing.PubKeyFetcher = (*DHT)(nil)
_ routing.ValueStore = (*DHT)(nil)
)
var (
maxPrefixCountPerCpl = 2
maxPrefixCount = 3
)
type config struct {
wan, lan []dht.Option
}
func (cfg *config) apply(opts ...Option) error {
for i, o := range opts {
if err := o(cfg); err != nil {
return fmt.Errorf("dual dht option %d failed: %w", i, err)
}
}
return nil
}
// Option is an option used to configure the Dual DHT.
type Option func(*config) error
// WanDHTOption applies the given DHT options to the WAN DHT.
func WanDHTOption(opts ...dht.Option) Option {
return func(c *config) error {
c.wan = append(c.wan, opts...)
return nil
}
}
// LanDHTOption applies the given DHT options to the LAN DHT.
func LanDHTOption(opts ...dht.Option) Option {
return func(c *config) error {
c.lan = append(c.lan, opts...)
return nil
}
}
// DHTOption applies the given DHT options to both the WAN and the LAN DHTs.
func DHTOption(opts ...dht.Option) Option {
return func(c *config) error {
c.lan = append(c.lan, opts...)
c.wan = append(c.wan, opts...)
return nil
}
}
// New creates a new DualDHT instance. Options provided are forwarded on to the two concrete
// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce
// the LAN-vs-WAN distinction.
// Note: query or routing table functional options provided as arguments to this function
// will be overriden by this constructor.
func New(ctx context.Context, h host.Host, options ...Option) (*DHT, error) {
var cfg config
err := cfg.apply(
WanDHTOption(
dht.QueryFilter(dht.PublicQueryFilter),
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)),
),
)
if err != nil {
return nil, err
}
err = cfg.apply(
LanDHTOption(
dht.ProtocolExtension(LanExtension),
dht.QueryFilter(dht.PrivateQueryFilter),
dht.RoutingTableFilter(dht.PrivateRoutingTableFilter),
),
)
if err != nil {
return nil, err
}
err = cfg.apply(options...)
if err != nil {
return nil, err
}
wan, err := dht.New(ctx, h, cfg.wan...)
if err != nil {
return nil, err
}
// Unless overridden by user supplied options, the LAN DHT should default
// to 'AutoServer' mode.
if wan.Mode() != dht.ModeClient {
cfg.lan = append(cfg.lan, dht.Mode(dht.ModeServer))
}
lan, err := dht.New(ctx, h, cfg.lan...)
if err != nil {
return nil, err
}
impl := DHT{wan, lan}
return &impl, nil
}
// Close closes the DHT context.
func (dht *DHT) Close() error {
return combineErrors(dht.WAN.Close(), dht.LAN.Close())
}
// WANActive returns true when the WAN DHT is active (has peers).
func (dht *DHT) WANActive() bool {
return dht.WAN.RoutingTable().Size() > 0
}
// Provide adds the given cid to the content routing system.
func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error {
if dht.WANActive() {
return dht.WAN.Provide(ctx, key, announce)
}
return dht.LAN.Provide(ctx, key, announce)
}
// GetRoutingTableDiversityStats fetches the Routing Table Diversity Stats.
func (dht *DHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
if dht.WANActive() {
return dht.WAN.GetRoutingTableDiversityStats()
}
return nil
}
// FindProvidersAsync searches for peers who are able to provide a given key
func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
reqCtx, cancel := context.WithCancel(ctx)
outCh := make(chan peer.AddrInfo)
// Register for and merge query events if we care about them.
subCtx := reqCtx
var evtCh <-chan *routing.QueryEvent
if routing.SubscribesToQueryEvents(ctx) {
subCtx, evtCh = routing.RegisterForQueryEvents(reqCtx)
}
wanCh := dht.WAN.FindProvidersAsync(subCtx, key, count)
lanCh := dht.LAN.FindProvidersAsync(subCtx, key, count)
zeroCount := (count == 0)
go func() {
defer cancel()
defer close(outCh)
found := make(map[peer.ID]struct{}, count)
var pi peer.AddrInfo
var qEv *routing.QueryEvent
for (zeroCount || count > 0) && (wanCh != nil || lanCh != nil) {
var ok bool
select {
case qEv, ok = <-evtCh:
if !ok {
evtCh = nil
} else if qEv != nil && qEv.Type != routing.QueryError {
routing.PublishQueryEvent(reqCtx, qEv)
}
continue
case pi, ok = <-wanCh:
if !ok {
wanCh = nil
continue
}
case pi, ok = <-lanCh:
if !ok {
lanCh = nil
continue
}
}
// already found
if _, ok = found[pi.ID]; ok {
continue
}
select {
case outCh <- pi:
found[pi.ID] = struct{}{}
count--
case <-ctx.Done():
return
}
}
if qEv != nil && qEv.Type == routing.QueryError && len(found) == 0 {
routing.PublishQueryEvent(reqCtx, qEv)
}
}()
return outCh
}
// FindPeer searches for a peer with given ID
// Note: with signed peer records, we can change this to short circuit once either DHT returns.
func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) {
var wg sync.WaitGroup
wg.Add(2)
var wanInfo, lanInfo peer.AddrInfo
var wanErr, lanErr error
go func() {
defer wg.Done()
wanInfo, wanErr = dht.WAN.FindPeer(ctx, pid)
}()
go func() {
defer wg.Done()
lanInfo, lanErr = dht.LAN.FindPeer(ctx, pid)
}()
wg.Wait()
// Combine addresses. Try to avoid doing unnecessary work while we're at
// it. Note: We're ignoring the errors for now as many of our DHT
// commands can return both a result and an error.
ai := peer.AddrInfo{ID: pid}
if len(wanInfo.Addrs) == 0 {
ai.Addrs = lanInfo.Addrs
} else if len(lanInfo.Addrs) == 0 {
ai.Addrs = wanInfo.Addrs
} else {
// combine addresses
deduped := make(map[string]ma.Multiaddr, len(wanInfo.Addrs)+len(lanInfo.Addrs))
for _, addr := range wanInfo.Addrs {
deduped[string(addr.Bytes())] = addr
}
for _, addr := range lanInfo.Addrs {
deduped[string(addr.Bytes())] = addr
}
ai.Addrs = make([]ma.Multiaddr, 0, len(deduped))
for _, addr := range deduped {
ai.Addrs = append(ai.Addrs, addr)
}
}
// If one of the commands succeeded, don't return an error.
if wanErr == nil || lanErr == nil {
return ai, nil
}
// Otherwise, return what we have _and_ return the error.
return ai, combineErrors(wanErr, lanErr)
}
func combineErrors(erra, errb error) error {
// if the errors are the same, just return one.
if erra == errb {
return erra
}
// If one of the errors is a kb lookup failure (no peers in routing
// table), return the other.
if erra == kb.ErrLookupFailure {
return errb
} else if errb == kb.ErrLookupFailure {
return erra
}
return multierror.Append(erra, errb).ErrorOrNil()
}
// Bootstrap allows callers to hint to the routing system to get into a
// Boostrapped state and remain there.
func (dht *DHT) Bootstrap(ctx context.Context) error {
erra := dht.WAN.Bootstrap(ctx)
errb := dht.LAN.Bootstrap(ctx)
return combineErrors(erra, errb)
}
// PutValue adds value corresponding to given Key.
func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
if dht.WANActive() {
return dht.WAN.PutValue(ctx, key, val, opts...)
}
return dht.LAN.PutValue(ctx, key, val, opts...)
}
// GetValue searches for the value corresponding to given Key.
func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
lanCtx, cancelLan := context.WithCancel(ctx)
defer cancelLan()
var (
lanVal []byte
lanErr error
lanWaiter sync.WaitGroup
)
lanWaiter.Add(1)
go func() {
defer lanWaiter.Done()
lanVal, lanErr = d.LAN.GetValue(lanCtx, key, opts...)
}()
wanVal, wanErr := d.WAN.GetValue(ctx, key, opts...)
if wanErr == nil {
cancelLan()
}
lanWaiter.Wait()
if wanErr == nil {
return wanVal, nil
}
if lanErr == nil {
return lanVal, nil
}
return nil, combineErrors(wanErr, lanErr)
}
// SearchValue searches for better values from this value
func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
return p.SearchValue(ctx, key, opts...)
}
// GetPublicKey returns the public key for the given peer.
func (dht *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) {
p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
return p.GetPublicKey(ctx, pid)
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package fullrt
import (
"fmt"
kaddht "github.com/libp2p/go-libp2p-kad-dht"
)
type config struct {
dhtOpts []kaddht.Option
}
func (cfg *config) apply(opts ...Option) error {
for i, o := range opts {
if err := o(cfg); err != nil {
return fmt.Errorf("fullrt dht option %d failed: %w", i, err)
}
}
return nil
}
type Option func(opt *config) error
func DHTOption(opts ...kaddht.Option) Option {
return func(c *config) error {
c.dhtOpts = append(c.dhtOpts, opts...)
return nil
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package config
import "github.com/libp2p/go-libp2p-core/routing"
type QuorumOptionKey struct{}
const defaultQuorum = 0
// GetQuorum defaults to 0 if no option is found
func GetQuorum(opts *routing.Options) int {
responsesNeeded, ok := opts.Other[QuorumOptionKey{}].(int)
if !ok {
responsesNeeded = defaultQuorum
}
return responsesNeeded
}
This diff is collapsed.
package internal
import "errors"
var ErrInvalidRecord = errors.New("received invalid record")
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment