Commit 9b719683 authored by Steven Allen's avatar Steven Allen

feat: update pubsub

parent c06d7261
This diff is collapsed.
...@@ -28,7 +28,7 @@ var log = logging.Logger("pubsub-valuestore") ...@@ -28,7 +28,7 @@ var log = logging.Logger("pubsub-valuestore")
// value store. This way, users can wrap the underlying pubsub implementation // value store. This way, users can wrap the underlying pubsub implementation
// without re-exporting/implementing the entire interface. // without re-exporting/implementing the entire interface.
type Pubsub interface { type Pubsub interface {
RegisterTopicValidator(topic string, val pubsub.Validator, opts ...pubsub.ValidatorOpt) error RegisterTopicValidator(topic string, validator interface{}, opts ...pubsub.ValidatorOpt) error
Join(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) Join(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error)
} }
...@@ -144,28 +144,30 @@ func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byt ...@@ -144,28 +144,30 @@ func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byt
} }
// compare compares the input value with the current value. // compare compares the input value with the current value.
// Returns 0 if equal, greater than 0 if better, less than 0 if worse // First return value is 0 if equal, greater than 0 if better, less than 0 if worse.
func (p *PubsubValueStore) compare(key string, val []byte) int { // Second return value is true if valid.
//
func (p *PubsubValueStore) compare(key string, val []byte) (int, bool) {
if p.Validator.Validate(key, val) != nil { if p.Validator.Validate(key, val) != nil {
return -1 return -1, false
} }
old, err := p.getLocal(key) old, err := p.getLocal(key)
if err != nil { if err != nil {
// If the old one is invalid, the new one is *always* better. // If the old one is invalid, the new one is *always* better.
return 1 return 1, true
} }
// Same record is not better // Same record is not better
if old != nil && bytes.Equal(old, val) { if old != nil && bytes.Equal(old, val) {
return 0 return 0, true
} }
i, err := p.Validator.Select(key, [][]byte{val, old}) i, err := p.Validator.Select(key, [][]byte{val, old})
if err == nil && i == 0 { if err == nil && i == 0 {
return 1 return 1, true
} }
return -1 return -1, true
} }
func (p *PubsubValueStore) Subscribe(key string) error { func (p *PubsubValueStore) Subscribe(key string) error {
...@@ -185,10 +187,20 @@ func (p *PubsubValueStore) Subscribe(key string) error { ...@@ -185,10 +187,20 @@ func (p *PubsubValueStore) Subscribe(key string) error {
// //
// Also, make sure to do this *before* subscribing. // Also, make sure to do this *before* subscribing.
myID := p.host.ID() myID := p.host.ID()
_ = p.ps.RegisterTopicValidator(topic, func(ctx context.Context, src peer.ID, msg *pubsub.Message) bool { _ = p.ps.RegisterTopicValidator(topic, func(
cmp := p.compare(key, msg.GetData()) ctx context.Context,
src peer.ID,
msg *pubsub.Message,
) pubsub.ValidationResult {
cmp, valid := p.compare(key, msg.GetData())
if !valid {
return pubsub.ValidationReject
}
return cmp > 0 || cmp == 0 && src == myID if cmp > 0 || cmp == 0 && src == myID {
return pubsub.ValidationAccept
}
return pubsub.ValidationIgnore
}) })
ti, err := p.createTopicHandler(topic) ti, err := p.createTopicHandler(topic)
...@@ -289,8 +301,8 @@ func (p *PubsubValueStore) psPublishChannel(ctx context.Context, topic *pubsub.T ...@@ -289,8 +301,8 @@ func (p *PubsubValueStore) psPublishChannel(ctx context.Context, topic *pubsub.T
// Returns true if the value is better then what is currently in the datastore // Returns true if the value is better then what is currently in the datastore
// Returns any errors from putting the data in the datastore // Returns any errors from putting the data in the datastore
func (p *PubsubValueStore) putLocal(ti *topicInfo, key string, value []byte) (int, error) { func (p *PubsubValueStore) putLocal(ti *topicInfo, key string, value []byte) (int, error) {
cmp := p.compare(key, value) cmp, valid := p.compare(key, value)
if cmp > 0 { if valid && cmp > 0 {
return cmp, p.ds.Put(dshelp.NewKeyFromBinary([]byte(key)), value) return cmp, p.ds.Put(dshelp.NewKeyFromBinary([]byte(key)), value)
} }
return cmp, nil return cmp, nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment