Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-pubsub
Commits
05c505ef
Commit
05c505ef
authored
Mar 10, 2021
by
vyzo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
export rejection named string constants
parent
6c1addf4
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
57 additions
and
57 deletions
+57
-57
gossip_tracer.go
gossip_tracer.go
+2
-2
peer_gater.go
peer_gater.go
+3
-3
peer_gater_test.go
peer_gater_test.go
+4
-4
pubsub.go
pubsub.go
+6
-6
score.go
score.go
+10
-10
score_test.go
score_test.go
+10
-10
tag_tracer.go
tag_tracer.go
+3
-3
tracer.go
tracer.go
+11
-11
validation.go
validation.go
+8
-8
No files found.
gossip_tracer.go
View file @
05c505ef
...
@@ -133,9 +133,9 @@ func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
...
@@ -133,9 +133,9 @@ func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
// We do take exception and apply promise penalty regardless in the following cases, where
// We do take exception and apply promise penalty regardless in the following cases, where
// the peer delivered an obviously invalid message.
// the peer delivered an obviously invalid message.
switch
reason {
switch
reason {
case
r
ejectMissingSignature:
case
R
ejectMissingSignature:
return
return
case
r
ejectInvalidSignature:
case
R
ejectInvalidSignature:
return
return
}
}
...
...
peer_gater.go
View file @
05c505ef
...
@@ -411,13 +411,13 @@ func (pg *peerGater) RejectMessage(msg *Message, reason string) {
...
@@ -411,13 +411,13 @@ func (pg *peerGater) RejectMessage(msg *Message, reason string) {
defer
pg.Unlock()
defer
pg.Unlock()
switch
reason {
switch
reason {
case
r
ejectValidationQueueFull:
case
R
ejectValidationQueueFull:
fallthrough
fallthrough
case
r
ejectValidationThrottled:
case
R
ejectValidationThrottled:
pg.lastThrottle = time.Now()
pg.lastThrottle = time.Now()
pg.throttle++
pg.throttle++
case
r
ejectValidationIgnored:
case
R
ejectValidationIgnored:
st := pg.getPeerStats(msg.ReceivedFrom)
st := pg.getPeerStats(msg.ReceivedFrom)
st.ignore++
st.ignore++
...
...
peer_gater_test.go
View file @
05c505ef
...
@@ -46,21 +46,21 @@ func TestPeerGater(t *testing.T) {
...
@@ -46,21 +46,21 @@ func TestPeerGater(t *testing.T) {
t.Fatal(
"expected AcceptAll"
)
t.Fatal(
"expected AcceptAll"
)
}
}
pg.RejectMessage(msg,
r
ejectValidationQueueFull)
pg.RejectMessage(msg,
R
ejectValidationQueueFull)
status = pg.AcceptFrom(peerA)
status = pg.AcceptFrom(peerA)
if
status != AcceptAll {
if
status != AcceptAll {
t.Fatal(
"expected AcceptAll"
)
t.Fatal(
"expected AcceptAll"
)
}
}
pg.RejectMessage(msg,
r
ejectValidationThrottled)
pg.RejectMessage(msg,
R
ejectValidationThrottled)
status = pg.AcceptFrom(peerA)
status = pg.AcceptFrom(peerA)
if
status != AcceptAll {
if
status != AcceptAll {
t.Fatal(
"expected AcceptAll"
)
t.Fatal(
"expected AcceptAll"
)
}
}
for
i := 0; i < 100; i++ {
for
i := 0; i < 100; i++ {
pg.RejectMessage(msg,
r
ejectValidationIgnored)
pg.RejectMessage(msg,
R
ejectValidationIgnored)
pg.RejectMessage(msg,
r
ejectValidationFailed)
pg.RejectMessage(msg,
R
ejectValidationFailed)
}
}
accepted := false
accepted := false
...
...
pubsub.go
View file @
05c505ef
...
@@ -987,14 +987,14 @@ func (p *PubSub) pushMsg(msg *Message) {
...
@@ -987,14 +987,14 @@ func (p *PubSub) pushMsg(msg *Message) {
// reject messages from blacklisted peers
// reject messages from blacklisted peers
if
p.blacklist.Contains(src) {
if
p.blacklist.Contains(src) {
log.Debugf(
"dropping message from blacklisted peer %s"
, src)
log.Debugf(
"dropping message from blacklisted peer %s"
, src)
p.tracer.RejectMessage(msg,
r
ejectBlacklstedPeer)
p.tracer.RejectMessage(msg,
R
ejectBlacklstedPeer)
return
return
}
}
// even if they are forwarded by good peers
// even if they are forwarded by good peers
if
p.blacklist.Contains(msg.GetFrom()) {
if
p.blacklist.Contains(msg.GetFrom()) {
log.Debugf(
"dropping message from blacklisted source %s"
, src)
log.Debugf(
"dropping message from blacklisted source %s"
, src)
p.tracer.RejectMessage(msg,
r
ejectBlacklistedSource)
p.tracer.RejectMessage(msg,
R
ejectBlacklistedSource)
return
return
}
}
...
@@ -1003,7 +1003,7 @@ func (p *PubSub) pushMsg(msg *Message) {
...
@@ -1003,7 +1003,7 @@ func (p *PubSub) pushMsg(msg *Message) {
if
p.signPolicy.mustSign() {
if
p.signPolicy.mustSign() {
if
msg.Signature == nil {
if
msg.Signature == nil {
log.Debugf(
"dropping unsigned message from %s"
, src)
log.Debugf(
"dropping unsigned message from %s"
, src)
p.tracer.RejectMessage(msg,
r
ejectMissingSignature)
p.tracer.RejectMessage(msg,
R
ejectMissingSignature)
return
return
}
}
// Actual signature verification happens in the validation pipeline,
// Actual signature verification happens in the validation pipeline,
...
@@ -1012,7 +1012,7 @@ func (p *PubSub) pushMsg(msg *Message) {
...
@@ -1012,7 +1012,7 @@ func (p *PubSub) pushMsg(msg *Message) {
}
else
{
}
else
{
if
msg.Signature != nil {
if
msg.Signature != nil {
log.Debugf(
"dropping message with unexpected signature from %s"
, src)
log.Debugf(
"dropping message with unexpected signature from %s"
, src)
p.tracer.RejectMessage(msg,
r
ejectUnexpectedSignature)
p.tracer.RejectMessage(msg,
R
ejectUnexpectedSignature)
return
return
}
}
// If we are expecting signed messages, and not authoring messages,
// If we are expecting signed messages, and not authoring messages,
...
@@ -1022,7 +1022,7 @@ func (p *PubSub) pushMsg(msg *Message) {
...
@@ -1022,7 +1022,7 @@ func (p *PubSub) pushMsg(msg *Message) {
if
p.signID ==
""
{
if
p.signID ==
""
{
if
msg.Seqno != nil || msg.From != nil || msg.Key != nil {
if
msg.Seqno != nil || msg.From != nil || msg.Key != nil {
log.Debugf(
"dropping message with unexpected auth info from %s"
, src)
log.Debugf(
"dropping message with unexpected auth info from %s"
, src)
p.tracer.RejectMessage(msg,
r
ejectUnexpectedAuthInfo)
p.tracer.RejectMessage(msg,
R
ejectUnexpectedAuthInfo)
return
return
}
}
}
}
...
@@ -1033,7 +1033,7 @@ func (p *PubSub) pushMsg(msg *Message) {
...
@@ -1033,7 +1033,7 @@ func (p *PubSub) pushMsg(msg *Message) {
self := p.host.ID()
self := p.host.ID()
if
peer.ID(msg.GetFrom()) == self && src != self {
if
peer.ID(msg.GetFrom()) == self && src != self {
log.Debugf(
"dropping message claiming to be from self but forwarded from %s"
, src)
log.Debugf(
"dropping message claiming to be from self but forwarded from %s"
, src)
p.tracer.RejectMessage(msg,
r
ejectSelfOrigin)
p.tracer.RejectMessage(msg,
R
ejectSelfOrigin)
return
return
}
}
...
...
score.go
View file @
05c505ef
...
@@ -722,25 +722,25 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
...
@@ -722,25 +722,25 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
switch
reason {
switch
reason {
// we don't track those messages, but we penalize the peer as they are clearly invalid
// we don't track those messages, but we penalize the peer as they are clearly invalid
case
r
ejectMissingSignature:
case
R
ejectMissingSignature:
fallthrough
fallthrough
case
r
ejectInvalidSignature:
case
R
ejectInvalidSignature:
fallthrough
fallthrough
case
r
ejectUnexpectedSignature:
case
R
ejectUnexpectedSignature:
fallthrough
fallthrough
case
r
ejectUnexpectedAuthInfo:
case
R
ejectUnexpectedAuthInfo:
fallthrough
fallthrough
case
r
ejectSelfOrigin:
case
R
ejectSelfOrigin:
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
return
return
// we ignore those messages, so do nothing.
// we ignore those messages, so do nothing.
case
r
ejectBlacklstedPeer:
case
R
ejectBlacklstedPeer:
fallthrough
fallthrough
case
r
ejectBlacklistedSource:
case
R
ejectBlacklistedSource:
return
return
case
r
ejectValidationQueueFull:
case
R
ejectValidationQueueFull:
// the message was rejected before it entered the validation pipeline;
// the message was rejected before it entered the validation pipeline;
// we don't know if this message has a valid signature, and thus we also don't know if
// we don't know if this message has a valid signature, and thus we also don't know if
// it has a valid message ID; all we can do is ignore it.
// it has a valid message ID; all we can do is ignore it.
...
@@ -756,14 +756,14 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
...
@@ -756,14 +756,14 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
}
}
switch
reason {
switch
reason {
case
r
ejectValidationThrottled:
case
R
ejectValidationThrottled:
// if we reject with "validation throttled" we don't penalize the peer(s) that forward it
// if we reject with "validation throttled" we don't penalize the peer(s) that forward it
// because we don't know if it was valid.
// because we don't know if it was valid.
drec.status = deliveryThrottled
drec.status = deliveryThrottled
// release the delivery time tracking map to free some memory early
// release the delivery time tracking map to free some memory early
drec.peers = nil
drec.peers = nil
return
return
case
r
ejectValidationIgnored:
case
R
ejectValidationIgnored:
// we were explicitly instructed by the validator to ignore the message but not penalize
// we were explicitly instructed by the validator to ignore the message but not penalize
// the peer
// the peer
drec.status = deliveryIgnored
drec.status = deliveryIgnored
...
...
score_test.go
View file @
05c505ef
...
@@ -475,7 +475,7 @@ func TestScoreInvalidMessageDeliveries(t *testing.T) {
...
@@ -475,7 +475,7 @@ func TestScoreInvalidMessageDeliveries(t *testing.T) {
pbMsg := makeTestMessage(i)
pbMsg := makeTestMessage(i)
pbMsg.Topic = &mytopic
pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg}
msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.RejectMessage(&msg,
r
ejectInvalidSignature)
ps.RejectMessage(&msg,
R
ejectInvalidSignature)
}
}
ps.refreshScores()
ps.refreshScores()
...
@@ -512,7 +512,7 @@ func TestScoreInvalidMessageDeliveriesDecay(t *testing.T) {
...
@@ -512,7 +512,7 @@ func TestScoreInvalidMessageDeliveriesDecay(t *testing.T) {
pbMsg := makeTestMessage(i)
pbMsg := makeTestMessage(i)
pbMsg.Topic = &mytopic
pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg}
msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.RejectMessage(&msg,
r
ejectInvalidSignature)
ps.RejectMessage(&msg,
R
ejectInvalidSignature)
}
}
ps.refreshScores()
ps.refreshScores()
...
@@ -561,9 +561,9 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
...
@@ -561,9 +561,9 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
msg2 := Message{ReceivedFrom: peerB, Message: pbMsg}
msg2 := Message{ReceivedFrom: peerB, Message: pbMsg}
// these should have no effect in the score
// these should have no effect in the score
ps.RejectMessage(&msg,
r
ejectBlacklstedPeer)
ps.RejectMessage(&msg,
R
ejectBlacklstedPeer)
ps.RejectMessage(&msg,
r
ejectBlacklistedSource)
ps.RejectMessage(&msg,
R
ejectBlacklistedSource)
ps.RejectMessage(&msg,
r
ejectValidationQueueFull)
ps.RejectMessage(&msg,
R
ejectValidationQueueFull)
aScore := ps.Score(peerA)
aScore := ps.Score(peerA)
expected := 0.0
expected := 0.0
...
@@ -576,7 +576,7 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
...
@@ -576,7 +576,7 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
// this should have no effect in the score, and subsequent duplicate messages should have no
// this should have no effect in the score, and subsequent duplicate messages should have no
// effect either
// effect either
ps.RejectMessage(&msg,
r
ejectValidationThrottled)
ps.RejectMessage(&msg,
R
ejectValidationThrottled)
ps.DuplicateMessage(&msg2)
ps.DuplicateMessage(&msg2)
aScore = ps.Score(peerA)
aScore = ps.Score(peerA)
...
@@ -601,7 +601,7 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
...
@@ -601,7 +601,7 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
// this should have no effect in the score, and subsequent duplicate messages should have no
// this should have no effect in the score, and subsequent duplicate messages should have no
// effect either
// effect either
ps.RejectMessage(&msg,
r
ejectValidationIgnored)
ps.RejectMessage(&msg,
R
ejectValidationIgnored)
ps.DuplicateMessage(&msg2)
ps.DuplicateMessage(&msg2)
aScore = ps.Score(peerA)
aScore = ps.Score(peerA)
...
@@ -625,7 +625,7 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
...
@@ -625,7 +625,7 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
ps.ValidateMessage(&msg)
ps.ValidateMessage(&msg)
// and reject the message to make sure duplicates are also penalized
// and reject the message to make sure duplicates are also penalized
ps.RejectMessage(&msg,
r
ejectValidationFailed)
ps.RejectMessage(&msg,
R
ejectValidationFailed)
ps.DuplicateMessage(&msg2)
ps.DuplicateMessage(&msg2)
aScore = ps.Score(peerA)
aScore = ps.Score(peerA)
...
@@ -650,7 +650,7 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
...
@@ -650,7 +650,7 @@ func TestScoreRejectMessageDeliveries(t *testing.T) {
// and reject the message after a duplciate has arrived
// and reject the message after a duplciate has arrived
ps.DuplicateMessage(&msg2)
ps.DuplicateMessage(&msg2)
ps.RejectMessage(&msg,
r
ejectValidationFailed)
ps.RejectMessage(&msg,
R
ejectValidationFailed)
aScore = ps.Score(peerA)
aScore = ps.Score(peerA)
expected = -4.0
expected = -4.0
...
@@ -1032,7 +1032,7 @@ func TestScoreResetTopicParams(t *testing.T) {
...
@@ -1032,7 +1032,7 @@ func TestScoreResetTopicParams(t *testing.T) {
pbMsg.Topic = &mytopic
pbMsg.Topic = &mytopic
msg := Message{ReceivedFrom: peerA, Message: pbMsg}
msg := Message{ReceivedFrom: peerA, Message: pbMsg}
ps.ValidateMessage(&msg)
ps.ValidateMessage(&msg)
ps.RejectMessage(&msg,
r
ejectValidationFailed)
ps.RejectMessage(&msg,
R
ejectValidationFailed)
}
}
// check the topic score
// check the topic score
...
...
tag_tracer.go
View file @
05c505ef
...
@@ -242,11 +242,11 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) {
...
@@ -242,11 +242,11 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) {
// the validation pipeline. Other rejection reasons (missing signature, etc) skip the validation
// the validation pipeline. Other rejection reasons (missing signature, etc) skip the validation
// queue, so we don't want to remove the state in case the message is still validating.
// queue, so we don't want to remove the state in case the message is still validating.
switch
reason {
switch
reason {
case
r
ejectValidationThrottled:
case
R
ejectValidationThrottled:
fallthrough
fallthrough
case
r
ejectValidationIgnored:
case
R
ejectValidationIgnored:
fallthrough
fallthrough
case
r
ejectValidationFailed:
case
R
ejectValidationFailed:
delete
(t.nearFirst, t.msgID(msg.Message))
delete
(t.nearFirst, t.msgID(msg.Message))
}
}
}
}
...
...
tracer.go
View file @
05c505ef
...
@@ -25,17 +25,17 @@ var MinTraceBatchSize = 16
...
@@ -25,17 +25,17 @@ var MinTraceBatchSize = 16
// rejection reasons
// rejection reasons
const
(
const
(
r
ejectBlacklstedPeer =
"blacklisted peer"
R
ejectBlacklstedPeer =
"blacklisted peer"
r
ejectBlacklistedSource =
"blacklisted source"
R
ejectBlacklistedSource =
"blacklisted source"
r
ejectMissingSignature =
"missing signature"
R
ejectMissingSignature =
"missing signature"
r
ejectUnexpectedSignature =
"unexpected signature"
R
ejectUnexpectedSignature =
"unexpected signature"
r
ejectUnexpectedAuthInfo =
"unexpected auth info"
R
ejectUnexpectedAuthInfo =
"unexpected auth info"
r
ejectInvalidSignature =
"invalid signature"
R
ejectInvalidSignature =
"invalid signature"
r
ejectValidationQueueFull =
"validation queue full"
R
ejectValidationQueueFull =
"validation queue full"
r
ejectValidationThrottled =
"validation throttled"
R
ejectValidationThrottled =
"validation throttled"
r
ejectValidationFailed =
"validation failed"
R
ejectValidationFailed =
"validation failed"
r
ejectValidationIgnored =
"validation ignored"
R
ejectValidationIgnored =
"validation ignored"
r
ejectSelfOrigin =
"self originated message"
R
ejectSelfOrigin =
"self originated message"
)
)
type
basicTracer
struct
{
type
basicTracer
struct
{
...
...
validation.go
View file @
05c505ef
...
@@ -201,7 +201,7 @@ func (v *validation) Push(src peer.ID, msg *Message) bool {
...
@@ -201,7 +201,7 @@ func (v *validation) Push(src peer.ID, msg *Message) bool {
case
v.validateQ <- &validateReq{vals, src, msg}:
case
v.validateQ <- &validateReq{vals, src, msg}:
default
:
default
:
log.Debugf(
"message validation throttled: queue full; dropping message from %s"
, src)
log.Debugf(
"message validation throttled: queue full; dropping message from %s"
, src)
v.tracer.RejectMessage(msg,
r
ejectValidationQueueFull)
v.tracer.RejectMessage(msg,
R
ejectValidationQueueFull)
}
}
return
false
return
false
}
}
...
@@ -242,7 +242,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
...
@@ -242,7 +242,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
if
msg.Signature != nil {
if
msg.Signature != nil {
if
!v.validateSignature(msg) {
if
!v.validateSignature(msg) {
log.Debugf(
"message signature validation failed; dropping message from %s"
, src)
log.Debugf(
"message signature validation failed; dropping message from %s"
, src)
v.tracer.RejectMessage(msg,
r
ejectInvalidSignature)
v.tracer.RejectMessage(msg,
R
ejectInvalidSignature)
return
return
}
}
}
}
...
@@ -282,7 +282,7 @@ loop:
...
@@ -282,7 +282,7 @@ loop:
if
result == ValidationReject {
if
result == ValidationReject {
log.Debugf(
"message validation failed; dropping message from %s"
, src)
log.Debugf(
"message validation failed; dropping message from %s"
, src)
v.tracer.RejectMessage(msg,
r
ejectValidationFailed)
v.tracer.RejectMessage(msg,
R
ejectValidationFailed)
return
return
}
}
...
@@ -296,13 +296,13 @@ loop:
...
@@ -296,13 +296,13 @@ loop:
}()
}()
default
:
default
:
log.Debugf(
"message validation throttled; dropping message from %s"
, src)
log.Debugf(
"message validation throttled; dropping message from %s"
, src)
v.tracer.RejectMessage(msg,
r
ejectValidationThrottled)
v.tracer.RejectMessage(msg,
R
ejectValidationThrottled)
}
}
return
return
}
}
if
result == ValidationIgnore {
if
result == ValidationIgnore {
v.tracer.RejectMessage(msg,
r
ejectValidationIgnored)
v.tracer.RejectMessage(msg,
R
ejectValidationIgnored)
return
return
}
}
...
@@ -332,15 +332,15 @@ func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message
...
@@ -332,15 +332,15 @@ func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message
v.p.sendMsg <- msg
v.p.sendMsg <- msg
case
ValidationReject:
case
ValidationReject:
log.Debugf(
"message validation failed; dropping message from %s"
, src)
log.Debugf(
"message validation failed; dropping message from %s"
, src)
v.tracer.RejectMessage(msg,
r
ejectValidationFailed)
v.tracer.RejectMessage(msg,
R
ejectValidationFailed)
return
return
case
ValidationIgnore:
case
ValidationIgnore:
log.Debugf(
"message validation punted; ignoring message from %s"
, src)
log.Debugf(
"message validation punted; ignoring message from %s"
, src)
v.tracer.RejectMessage(msg,
r
ejectValidationIgnored)
v.tracer.RejectMessage(msg,
R
ejectValidationIgnored)
return
return
case
validationThrottled:
case
validationThrottled:
log.Debugf(
"message validation throttled; ignoring message from %s"
, src)
log.Debugf(
"message validation throttled; ignoring message from %s"
, src)
v.tracer.RejectMessage(msg,
r
ejectValidationThrottled)
v.tracer.RejectMessage(msg,
R
ejectValidationThrottled)
default
:
default
:
// BUG: this would be an internal programming error, so a panic seems appropiate.
// BUG: this would be an internal programming error, so a panic seems appropiate.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment