Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kad-dht
Commits
301a6e4a
Unverified
Commit
301a6e4a
authored
May 27, 2021
by
Adin Schmahmann
Committed by
GitHub
May 27, 2021
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #719 from libp2p/fix/fullrt-log-progress
fix: fullrt dht bug fixes
parents
eac1b5e1
daab800d
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
211 additions
and
59 deletions
+211
-59
fullrt/dht.go
fullrt/dht.go
+126
-59
fullrt/dht_test.go
fullrt/dht_test.go
+85
-0
No files found.
fullrt/dht.go
View file @
301a6e4a
...
...
@@ -464,7 +464,7 @@ func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts
})
err := dht.protoMessenger.PutValue(ctx, p, rec)
return err
},
peers
)
}, peers
, true
)
if successes == 0 {
return fmt.Errorf("failed to complete put")
...
...
@@ -751,7 +751,7 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str
return nil
}
dht
.
execOnMany
(
ctx
,
queryFn
,
peers
)
dht.execOnMany(ctx, queryFn, peers
, false
)
lookupResCh <- &lookupWithFollowupResult{peers: peers}
}()
return valCh, lookupResCh
...
...
@@ -817,7 +817,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.h)
return err
},
peers
)
}, peers
, true
)
if exceededDeadline {
return context.DeadlineExceeded
...
...
@@ -830,41 +830,71 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
return ctx.Err()
}
func
(
dht
*
FullRT
)
execOnMany
(
ctx
context
.
Context
,
fn
func
(
context
.
Context
,
peer
.
ID
)
error
,
peers
[]
peer
.
ID
)
int
{
putctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
defer
cancel
()
// execOnMany executes the given function on each of the peers, although it may only wait for a certain chunk of peers
// to respond before considering the results "good enough" and returning.
//
// If sloppyExit is true then this function will return without waiting for all of its internal goroutines to close.
// If sloppyExit is true then the passed in function MUST be able to safely complete an arbitrary amount of time after
// execOnMany has returned (e.g. do not write to resources that might get closed or set to nil and therefore result in
// a panic instead of just returning an error).
func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID, sloppyExit bool) int {
if len(peers) == 0 {
return 0
}
waitAllCh
:=
make
(
chan
struct
{},
len
(
peers
))
// having a buffer that can take all of the elements is basically a hack to allow for sloppy exits that clean up
// the goroutines after the function is done rather than before
errCh := make(chan error, len(peers))
numSuccessfulToWaitFor := int(float64(len(peers)) * dht.waitFrac)
waitSuccessCh
:=
make
(
chan
struct
{},
numSuccessfulToWaitFor
)
putctx, cancel := context.WithTimeout(ctx, dht.timeoutPerOp)
defer cancel()
for _, p := range peers {
go func(p peer.ID) {
fnCtx
,
fnCancel
:=
context
.
WithTimeout
(
putctx
,
dht
.
timeoutPerOp
)
defer
fnCancel
()
err
:=
fn
(
fnCtx
,
p
)
if
err
!=
nil
{
logger
.
Debug
(
err
)
}
else
{
waitSuccessCh
<-
struct
{}{}
}
waitAllCh
<-
struct
{}{}
errCh <- fn(putctx, p)
}(p)
}
numSuccess
,
numDone
:=
0
,
0
t
:=
time
.
NewTimer
(
time
.
Hour
)
for
numDone
!=
len
(
peers
)
{
var numDone, numSuccess, successSinceLastTick int
var ticker *time.Ticker
var tickChan <-chan time.Time
for numDone < len(peers) {
select {
case
<-
waitAll
Ch
:
case
err := <-err
Ch:
numDone++
case
<-
waitSuccessCh
:
if
numSuccess
>=
numSuccessfulToWaitFor
{
t
.
Reset
(
time
.
Millisecond
*
500
)
if err == nil {
numSuccess++
if numSuccess >= numSuccessfulToWaitFor && ticker == nil {
// Once there are enough successes, wait a little longer
ticker = time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
tickChan = ticker.C
successSinceLastTick = numSuccess
}
// This is equivalent to numSuccess * 2 + numFailures >= len(peers) and is a heuristic that seems to be
// performing reasonably.
// TODO: Make this metric more configurable
// TODO: Have better heuristics in this function whether determined from observing static network
// properties or dynamically calculating them
if numSuccess+numDone >= len(peers) {
cancel()
if sloppyExit {
return numSuccess
}
}
}
case <-tickChan:
if numSuccess > successSinceLastTick {
// If there were additional successes, then wait another tick
successSinceLastTick = numSuccess
} else {
cancel()
if sloppyExit {
return numSuccess
}
}
numSuccess
++
numDone
++
case
<-
t
.
C
:
cancel
()
}
}
return numSuccess
...
...
@@ -898,7 +928,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
pmes.ProviderPeers = pbPeers
return dht.messageSender.SendMessage(ctx, p, pmes)
},
peers
)
}, peers
, true
)
if successes == 0 {
return fmt.Errorf("no successful provides")
}
...
...
@@ -941,7 +971,7 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte)
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
keyStr := string(k)
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
},
peers
)
}, peers
, true
)
if successes == 0 {
return fmt.Errorf("no successful puts")
}
...
...
@@ -962,39 +992,43 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(
var numSendsSuccessful uint64 = 0
wg := sync.WaitGroup{}
wg
.
Add
(
dht
.
bulkSendParallelism
)
chunkSize
:=
len
(
sortedKeys
)
/
dht
.
bulkSendParallelism
onePctKeys := uint64(len(sortedKeys)) / 100
for
i
:=
0
;
i
<
dht
.
bulkSendParallelism
;
i
++
{
var
chunk
[]
peer
.
ID
end
:=
(
i
+
1
)
*
chunkSize
if
end
>
len
(
sortedKeys
)
{
chunk
=
sortedKeys
[
i
*
chunkSize
:
]
}
else
{
chunk
=
sortedKeys
[
i
*
chunkSize
:
end
]
}
go
func
(
)
{
defer
wg
.
Done
()
for
_
,
key
:=
range
chunk
{
sendsSoFar
:=
atomic
.
AddUint64
(
&
numSends
,
1
)
if
sendsSoFar
%
onePctKeys
==
0
{
logger
.
Infof
(
"bulk sending goroutine: %.1f%% done - %d/%d done"
,
100
*
float64
(
sendsSoFar
)
/
float64
(
len
(
sortedKeys
)),
sendsSoFar
,
len
(
sortedKeys
))
}
if
err
:=
fn
(
ctx
,
key
);
err
!=
nil
{
var
l
interface
{}
if
isProvRec
{
l
=
internal
.
LoggableProviderRecordBytes
(
key
)
}
else
{
l
=
inter
nal
.
LoggableRecordKeyString
(
key
)
}
l
ogger
.
Infof
(
"failed to complete bulk sending of key :%v. %v"
,
l
,
err
)
bulkSendFn := func(chunk []peer.ID
) {
defer wg.Done()
for _, key := range chunk {
if ctx.Err() != nil {
break
}
sendsSoFar := atomic.AddUint64(&numSends, 1)
if onePctKeys > 0 && sendsSoFar%onePctKeys == 0 {
logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys))
}
if err := fn(ctx, key); err != nil
{
var l
inter
face{}
if isProvRec {
l
= internal.LoggableProviderRecordBytes(key
)
} else {
atomic
.
AddUint64
(
&
numSendsSuccessful
,
1
)
l = internal.LoggableRecordKeyString(key
)
}
logger.Infof("failed to complete bulk sending of key :%v. %v", l, err)
} else {
atomic.AddUint64(&numSendsSuccessful, 1)
}
}()
}
}
// divide the keys into groups so that we can talk to more peers at a time, because the keys are sorted in
// XOR/Kadmelia space consecutive puts will be too the same, or nearly the same, set of peers. Working in parallel
// means less waiting on individual dials to complete and also continuing to make progress even if one segment of
// the network is being slow, or we are maxing out the connection, stream, etc. to those peers.
keyGroups := divideIntoGroups(sortedKeys, dht.bulkSendParallelism)
wg.Add(len(keyGroups))
for _, chunk := range keyGroups {
go bulkSendFn(chunk)
}
wg.Wait()
if numSendsSuccessful == 0 {
...
...
@@ -1006,6 +1040,39 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(
return nil
}
// divideIntoGroups divides the set of keys into (at most) the number of groups
func divideIntoGroups(keys []peer.ID, groups int) [][]peer.ID {
var keyGroups [][]peer.ID
if len(keys) < groups {
for i := 0; i < len(keys); i++ {
keyGroups = append(keyGroups, keys[i:i+1])
}
return keyGroups
}
chunkSize := len(keys) / groups
remainder := len(keys) % groups
start := 0
end := chunkSize
for i := 0; i < groups; i++ {
var chunk []peer.ID
// distribute the remainder as one extra entry per parallel thread
if remainder > 0 {
chunk = keys[start : end+1]
remainder--
start = end + 1
end = end + 1 + chunkSize
} else {
chunk = keys[start:end]
start = end
end = end + chunkSize
}
keyGroups = append(keyGroups, chunk)
}
return keyGroups
}
// FindProviders searches until the context expires.
func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
if !dht.enableProviders {
...
...
@@ -1129,7 +1196,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
return nil
}
dht
.
execOnMany
(
queryctx
,
fn
,
peers
)
dht.execOnMany(queryctx, fn, peers
, false
)
}
// FindPeer searches for a peer with given ID.
...
...
@@ -1214,7 +1281,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, e
return nil
}
dht
.
execOnMany
(
queryctx
,
fn
,
peers
)
dht.execOnMany(queryctx, fn, peers
, false
)
close(addrsCh)
wg.Wait()
...
...
fullrt/dht_test.go
0 → 100644
View file @
301a6e4a
package
fullrt
import
(
"strconv"
"testing"
"github.com/libp2p/go-libp2p-core/peer"
)
func
TestDivideIntoGroups
(
t
*
testing
.
T
)
{
var
keys
[]
peer
.
ID
for
i
:=
0
;
i
<
10
;
i
++
{
keys
=
append
(
keys
,
peer
.
ID
(
strconv
.
Itoa
(
i
)))
}
convertToStrings
:=
func
(
peers
[]
peer
.
ID
)
[]
string
{
var
out
[]
string
for
_
,
p
:=
range
peers
{
out
=
append
(
out
,
string
(
p
))
}
return
out
}
pidsEquals
:=
func
(
a
,
b
[]
string
)
bool
{
if
len
(
a
)
!=
len
(
b
)
{
return
false
}
for
i
,
v
:=
range
a
{
if
v
!=
b
[
i
]
{
return
false
}
}
return
true
}
t
.
Run
(
"Divides"
,
func
(
t
*
testing
.
T
)
{
gr
:=
divideIntoGroups
(
keys
,
2
)
if
len
(
gr
)
!=
2
{
t
.
Fatal
(
"incorrect number of groups"
)
}
if
g1
,
expected
:=
convertToStrings
(
gr
[
0
]),
[]
string
{
"0"
,
"1"
,
"2"
,
"3"
,
"4"
};
!
pidsEquals
(
g1
,
expected
)
{
t
.
Fatalf
(
"expected %v, got %v"
,
expected
,
g1
)
}
if
g2
,
expected
:=
convertToStrings
(
gr
[
1
]),
[]
string
{
"5"
,
"6"
,
"7"
,
"8"
,
"9"
};
!
pidsEquals
(
g2
,
expected
)
{
t
.
Fatalf
(
"expected %v, got %v"
,
expected
,
g2
)
}
})
t
.
Run
(
"Remainder"
,
func
(
t
*
testing
.
T
)
{
gr
:=
divideIntoGroups
(
keys
,
3
)
if
len
(
gr
)
!=
3
{
t
.
Fatal
(
"incorrect number of groups"
)
}
if
g
,
expected
:=
convertToStrings
(
gr
[
0
]),
[]
string
{
"0"
,
"1"
,
"2"
,
"3"
};
!
pidsEquals
(
g
,
expected
)
{
t
.
Fatalf
(
"expected %v, got %v"
,
expected
,
g
)
}
if
g
,
expected
:=
convertToStrings
(
gr
[
1
]),
[]
string
{
"4"
,
"5"
,
"6"
};
!
pidsEquals
(
g
,
expected
)
{
t
.
Fatalf
(
"expected %v, got %v"
,
expected
,
g
)
}
if
g
,
expected
:=
convertToStrings
(
gr
[
2
]),
[]
string
{
"7"
,
"8"
,
"9"
};
!
pidsEquals
(
g
,
expected
)
{
t
.
Fatalf
(
"expected %v, got %v"
,
expected
,
g
)
}
})
t
.
Run
(
"OneEach"
,
func
(
t
*
testing
.
T
)
{
gr
:=
divideIntoGroups
(
keys
,
10
)
if
len
(
gr
)
!=
10
{
t
.
Fatal
(
"incorrect number of groups"
)
}
for
i
:=
0
;
i
<
10
;
i
++
{
if
g
,
expected
:=
convertToStrings
(
gr
[
i
]),
[]
string
{
strconv
.
Itoa
(
i
)};
!
pidsEquals
(
g
,
expected
)
{
t
.
Fatalf
(
"expected %v, got %v"
,
expected
,
g
)
}
}
})
t
.
Run
(
"TooManyGroups"
,
func
(
t
*
testing
.
T
)
{
gr
:=
divideIntoGroups
(
keys
,
11
)
if
len
(
gr
)
!=
10
{
t
.
Fatal
(
"incorrect number of groups"
)
}
for
i
:=
0
;
i
<
10
;
i
++
{
if
g
,
expected
:=
convertToStrings
(
gr
[
i
]),
[]
string
{
strconv
.
Itoa
(
i
)};
!
pidsEquals
(
g
,
expected
)
{
t
.
Fatalf
(
"expected %v, got %v"
,
expected
,
g
)
}
}
})
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment