Unverified Commit 14b1bff6 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #76 from libp2p/feat/update-pubsub

feat: update pubsub
parents c06d7261 9b719683
This diff is collapsed.
......@@ -28,7 +28,7 @@ var log = logging.Logger("pubsub-valuestore")
// value store. This way, users can wrap the underlying pubsub implementation
// without re-exporting/implementing the entire interface.
type Pubsub interface {
RegisterTopicValidator(topic string, val pubsub.Validator, opts ...pubsub.ValidatorOpt) error
RegisterTopicValidator(topic string, validator interface{}, opts ...pubsub.ValidatorOpt) error
Join(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error)
}
......@@ -144,28 +144,30 @@ func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byt
}
// compare compares the input value with the current value.
// Returns 0 if equal, greater than 0 if better, less than 0 if worse
func (p *PubsubValueStore) compare(key string, val []byte) int {
// First return value is 0 if equal, greater than 0 if better, less than 0 if worse.
// Second return value is true if valid.
//
func (p *PubsubValueStore) compare(key string, val []byte) (int, bool) {
if p.Validator.Validate(key, val) != nil {
return -1
return -1, false
}
old, err := p.getLocal(key)
if err != nil {
// If the old one is invalid, the new one is *always* better.
return 1
return 1, true
}
// Same record is not better
if old != nil && bytes.Equal(old, val) {
return 0
return 0, true
}
i, err := p.Validator.Select(key, [][]byte{val, old})
if err == nil && i == 0 {
return 1
return 1, true
}
return -1
return -1, true
}
func (p *PubsubValueStore) Subscribe(key string) error {
......@@ -185,10 +187,20 @@ func (p *PubsubValueStore) Subscribe(key string) error {
//
// Also, make sure to do this *before* subscribing.
myID := p.host.ID()
_ = p.ps.RegisterTopicValidator(topic, func(ctx context.Context, src peer.ID, msg *pubsub.Message) bool {
cmp := p.compare(key, msg.GetData())
_ = p.ps.RegisterTopicValidator(topic, func(
ctx context.Context,
src peer.ID,
msg *pubsub.Message,
) pubsub.ValidationResult {
cmp, valid := p.compare(key, msg.GetData())
if !valid {
return pubsub.ValidationReject
}
return cmp > 0 || cmp == 0 && src == myID
if cmp > 0 || cmp == 0 && src == myID {
return pubsub.ValidationAccept
}
return pubsub.ValidationIgnore
})
ti, err := p.createTopicHandler(topic)
......@@ -289,8 +301,8 @@ func (p *PubsubValueStore) psPublishChannel(ctx context.Context, topic *pubsub.T
// Returns true if the value is better then what is currently in the datastore
// Returns any errors from putting the data in the datastore
func (p *PubsubValueStore) putLocal(ti *topicInfo, key string, value []byte) (int, error) {
cmp := p.compare(key, value)
if cmp > 0 {
cmp, valid := p.compare(key, value)
if valid && cmp > 0 {
return cmp, p.ds.Put(dshelp.NewKeyFromBinary([]byte(key)), value)
}
return cmp, nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment