From f3f2cb2c8246c7b3aee636a9a3e42534b75ebb71 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 10 Sep 2016 20:47:12 -0700 Subject: [PATCH] use timecache --- .travis.yml | 21 +++++ LICENSE | 21 +++++ Makefile | 7 ++ floodsub.go | 228 +++++++++++++++++++++++++++++---------------------- package.json | 6 ++ pb/rpc.pb.go | 228 +++++++++++++++++++++++++++++++++++++++++++++------ pb/rpc.proto | 45 ++++++++-- 7 files changed, 424 insertions(+), 132 deletions(-) create mode 100644 .travis.yml create mode 100644 LICENSE create mode 100644 Makefile diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..26a9b91 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,21 @@ +os: + - linux + - osx + +language: go + +go: + - 1.7 + +install: true + +script: + - make deps + - go test ./... + +cache: + directories: + - $GOPATH/src/gx + +notifications: + email: false diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..2610033 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Jeromy Johnson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3c64b37 --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +gx: + go get -u github.com/whyrusleeping/gx + go get -u github.com/whyrusleeping/gx-go + +deps: gx + gx --verbose install --global + gx-go rewrite diff --git a/floodsub.go b/floodsub.go index bc0c132..43d8339 100644 --- a/floodsub.go +++ b/floodsub.go @@ -3,6 +3,7 @@ package floodsub import ( "bufio" "context" + "encoding/binary" "fmt" "io" "sync" @@ -17,6 +18,7 @@ import ( host "github.com/libp2p/go-libp2p/p2p/host" inet "github.com/libp2p/go-libp2p/p2p/net" protocol "github.com/libp2p/go-libp2p/p2p/protocol" + timecache "github.com/whyrusleeping/timecache" ) const ID = protocol.ID("/floodsub/1.0.0") @@ -33,16 +35,16 @@ type PubSub struct { host host.Host incoming chan *RPC - outgoing chan *RPC + publish chan *Message newPeers chan inet.Stream peerDead chan peer.ID myTopics map[string]chan *Message pubsubLk sync.Mutex - topics map[string]map[peer.ID]struct{} - peers map[peer.ID]chan *RPC - lastMsg map[peer.ID]uint64 + topics map[string]map[peer.ID]struct{} + peers map[peer.ID]chan *RPC + seenMessages *timecache.TimeCache addSub chan *addSub @@ -66,17 +68,17 @@ type RPC struct { func NewFloodSub(ctx context.Context, h host.Host) *PubSub { ps := &PubSub{ - host: h, - ctx: ctx, - incoming: make(chan *RPC, 32), - outgoing: make(chan *RPC), - newPeers: make(chan inet.Stream), - peerDead: make(chan peer.ID), - addSub: make(chan *addSub), - myTopics: make(map[string]chan *Message), - topics: make(map[string]map[peer.ID]struct{}), - peers: make(map[peer.ID]chan *RPC), - lastMsg: make(map[peer.ID]uint64), + host: h, + ctx: ctx, + incoming: make(chan *RPC, 32), + publish: make(chan *Message), + newPeers: make(chan inet.Stream), + peerDead: make(chan peer.ID), + addSub: make(chan *addSub), + myTopics: make(map[string]chan *Message), + topics: make(map[string]map[peer.ID]struct{}), + peers: make(map[peer.ID]chan *RPC), + seenMessages: timecache.NewTimeCache(time.Second * 30), } h.SetStreamHandler(ID, ps.handleNewStream) @@ -90,9 +92,12 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { func (p *PubSub) getHelloPacket() *RPC { var rpc RPC for t, _ := range p.myTopics { - rpc.Topics = append(rpc.Topics, t) + as := &pb.RPC_SubOpts{ + Topicid: proto.String(t), + Subscribe: proto.Bool(true), + } + rpc.Subscriptions = append(rpc.Subscriptions, as) } - rpc.Type = &AddSubMessageType return &rpc } @@ -188,23 +193,15 @@ func (p *PubSub) processLoop(ctx context.Context) { if err != nil { log.Error("handling RPC: ", err) } - case rpc := <-p.outgoing: - switch rpc.GetType() { - case AddSubMessageType, UnsubMessageType: - for _, mch := range p.peers { - mch <- rpc - } - case PubMessageType: - //fmt.Println("publishing outgoing message") - err := p.recvMessage(rpc) - if err != nil { - log.Error("error receiving message: ", err) - } - - err = p.publishMessage(rpc) - if err != nil { - log.Error("publishing message: ", err) - } + case msg := <-p.publish: + err := p.recvMessage(msg.Message) + if err != nil { + log.Error("error receiving message: ", err) + } + + err = p.publishMessage(p.host.ID(), msg.Message) + if err != nil { + log.Error("publishing message: ", err) } case <-ctx.Done(): log.Info("pubsub processloop shutting down") @@ -213,22 +210,14 @@ func (p *PubSub) processLoop(ctx context.Context) { } } func (p *PubSub) handleSubscriptionChange(sub *addSub) { - ch, ok := p.myTopics[sub.topic] - out := &RPC{ - RPC: pb.RPC{ - Topics: []string{sub.topic}, - }, - } - if sub.cancel { - if !ok { - return - } + subopt := pb.RPC_SubOpts{ + Topicid: &sub.topic, + Subscribe: &sub.sub, + } - close(ch) - delete(p.myTopics, sub.topic) - out.Type = &UnsubMessageType - } else { + ch, ok := p.myTopics[sub.topic] + if sub.sub { if ok { // we don't allow multiple subs per topic at this point sub.resp <- nil @@ -238,27 +227,58 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) { resp := make(chan *Message, 16) p.myTopics[sub.topic] = resp sub.resp <- resp - out.Type = &AddSubMessageType + } else { + if !ok { + return + } + + close(ch) + delete(p.myTopics, sub.topic) + } + + out := &RPC{ + RPC: pb.RPC{ + Subscriptions: []*pb.RPC_SubOpts{ + &subopt, + }, + }, } - go func() { - p.outgoing <- out - }() + for _, peer := range p.peers { + peer <- out + } } -func (p *PubSub) recvMessage(rpc *RPC) error { - subch, ok := p.myTopics[rpc.Msg.GetTopic()] +func (p *PubSub) recvMessage(msg *pb.Message) error { + if len(msg.GetTopicIDs()) > 1 { + return fmt.Errorf("Dont yet handle multiple topics per message") + } + if len(msg.GetTopicIDs()) == 0 { + return fmt.Errorf("no topic on received message") + } + + topic := msg.GetTopicIDs()[0] + subch, ok := p.myTopics[topic] if ok { - //fmt.Println("writing out to subscriber!") - subch <- &Message{rpc.Msg} + subch <- &Message{msg} + } else { + log.Error("received message we we'rent subscribed to") } return nil } +func (p *PubSub) seenMessage(id string) bool { + return p.seenMessages.Has(id) +} + +func (p *PubSub) markSeen(id string) { + p.seenMessages.Add(id) +} + func (p *PubSub) handleIncomingRPC(rpc *RPC) error { - switch rpc.GetType() { - case AddSubMessageType: - for _, t := range rpc.Topics { + for _, subopt := range rpc.GetSubscriptions() { + t := subopt.GetTopicid() + if subopt.GetSubscribe() { tmap, ok := p.topics[t] if !ok { tmap = make(map[peer.ID]struct{}) @@ -266,28 +286,22 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { } tmap[rpc.from] = struct{}{} - } - case UnsubMessageType: - for _, t := range rpc.Topics { + } else { tmap, ok := p.topics[t] if !ok { - return nil + continue } delete(tmap, rpc.from) } - case PubMessageType: - if rpc.Msg == nil { - return fmt.Errorf("nil pub message") - } + } - msg := &Message{rpc.Msg} + for _, pmsg := range rpc.GetPublish() { + msg := &Message{pmsg} - // Note: Obviously this is an incredibly insecure way of - // filtering out "messages we've already seen". But it works for a - // cool demo, so i'm not gonna waste time thinking about it any more - if p.lastMsg[msg.GetFrom()] >= msg.GetSeqno() { - //log.Error("skipping 'old' message") - return nil + id := msg.Message.GetFrom() + string(msg.GetSeqno()) + + if p.seenMessage(id) { + continue } if msg.GetFrom() == p.host.ID() { @@ -295,13 +309,13 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { return nil } - p.lastMsg[msg.GetFrom()] = msg.GetSeqno() + p.markSeen(id) - if err := p.recvMessage(rpc); err != nil { + if err := p.recvMessage(pmsg); err != nil { log.Error("error receiving message: ", err) } - err := p.publishMessage(rpc) + err := p.publishMessage(rpc.from, pmsg) if err != nil { log.Error("publish message: ", err) } @@ -309,14 +323,20 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { return nil } -func (p *PubSub) publishMessage(rpc *RPC) error { - tmap, ok := p.topics[rpc.Msg.GetTopic()] +func (p *PubSub) publishMessage(from peer.ID, msg *pb.Message) error { + if len(msg.GetTopicIDs()) != 1 { + return fmt.Errorf("don't support publishing to multiple topics in a single message") + } + + tmap, ok := p.topics[msg.GetTopicIDs()[0]] if !ok { return nil } + out := &RPC{RPC: pb.RPC{Publish: []*pb.Message{msg}}} + for pid, _ := range tmap { - if pid == rpc.from || pid == peer.ID(rpc.Msg.GetFrom()) { + if pid == from || pid == peer.ID(msg.GetFrom()) { continue } @@ -325,23 +345,38 @@ func (p *PubSub) publishMessage(rpc *RPC) error { continue } - go func() { mch <- rpc }() + go func() { mch <- out }() } return nil } type addSub struct { - topic string - cancel bool - resp chan chan *Message + topic string + sub bool + resp chan chan *Message } func (p *PubSub) Subscribe(topic string) (<-chan *Message, error) { + return p.SubscribeComplicated(&pb.TopicDescriptor{ + Name: proto.String(topic), + }) +} + +func (p *PubSub) SubscribeComplicated(td *pb.TopicDescriptor) (<-chan *Message, error) { + if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { + return nil, fmt.Errorf("Auth method not yet supported") + } + + if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE { + return nil, fmt.Errorf("Encryption method not yet supported") + } + resp := make(chan chan *Message) p.addSub <- &addSub{ - topic: topic, + topic: td.GetName(), resp: resp, + sub: true, } outch := <-resp @@ -354,22 +389,21 @@ func (p *PubSub) Subscribe(topic string) (<-chan *Message, error) { func (p *PubSub) Unsub(topic string) { p.addSub <- &addSub{ - topic: topic, - cancel: true, + topic: topic, + sub: false, } } func (p *PubSub) Publish(topic string, data []byte) error { - seqno := uint64(time.Now().UnixNano()) - p.outgoing <- &RPC{ - RPC: pb.RPC{ - Msg: &pb.Message{ - Data: data, - Topic: &topic, - From: proto.String(string(p.host.ID())), - Seqno: &seqno, - }, - Type: &PubMessageType, + seqno := make([]byte, 8) + binary.BigEndian.PutUint64(seqno, uint64(time.Now().UnixNano())) + + p.publish <- &Message{ + &pb.Message{ + Data: data, + TopicIDs: []string{topic}, + From: proto.String(string(p.host.ID())), + Seqno: seqno, }, } return nil diff --git a/package.json b/package.json index d838a90..0ead74f 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,12 @@ "hash": "QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV", "name": "gogo-protobuf", "version": "0.0.0" + }, + { + "author": "whyrusleeping", + "hash": "QmYftoT56eEfUBTD3erR6heXuPSUhGRezSmhSU8LeczP8b", + "name": "timecache", + "version": "1.0.0" } ], "gxVersion": "0.9.0", diff --git a/pb/rpc.pb.go b/pb/rpc.pb.go index 56dd89a..2418c4c 100644 --- a/pb/rpc.pb.go +++ b/pb/rpc.pb.go @@ -11,6 +11,7 @@ It is generated from these files: It has these top-level messages: RPC Message + TopicDescriptor */ package floodsub_pb @@ -23,44 +24,132 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +type TopicDescriptor_AuthOpts_AuthMode int32 + +const ( + TopicDescriptor_AuthOpts_NONE TopicDescriptor_AuthOpts_AuthMode = 0 + TopicDescriptor_AuthOpts_KEY TopicDescriptor_AuthOpts_AuthMode = 1 + TopicDescriptor_AuthOpts_WOT TopicDescriptor_AuthOpts_AuthMode = 2 +) + +var TopicDescriptor_AuthOpts_AuthMode_name = map[int32]string{ + 0: "NONE", + 1: "KEY", + 2: "WOT", +} +var TopicDescriptor_AuthOpts_AuthMode_value = map[string]int32{ + "NONE": 0, + "KEY": 1, + "WOT": 2, +} + +func (x TopicDescriptor_AuthOpts_AuthMode) Enum() *TopicDescriptor_AuthOpts_AuthMode { + p := new(TopicDescriptor_AuthOpts_AuthMode) + *p = x + return p +} +func (x TopicDescriptor_AuthOpts_AuthMode) String() string { + return proto.EnumName(TopicDescriptor_AuthOpts_AuthMode_name, int32(x)) +} +func (x *TopicDescriptor_AuthOpts_AuthMode) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(TopicDescriptor_AuthOpts_AuthMode_value, data, "TopicDescriptor_AuthOpts_AuthMode") + if err != nil { + return err + } + *x = TopicDescriptor_AuthOpts_AuthMode(value) + return nil +} + +type TopicDescriptor_EncOpts_EncMode int32 + +const ( + TopicDescriptor_EncOpts_NONE TopicDescriptor_EncOpts_EncMode = 0 + TopicDescriptor_EncOpts_SHAREDKEY TopicDescriptor_EncOpts_EncMode = 1 + TopicDescriptor_EncOpts_WOT TopicDescriptor_EncOpts_EncMode = 2 +) + +var TopicDescriptor_EncOpts_EncMode_name = map[int32]string{ + 0: "NONE", + 1: "SHAREDKEY", + 2: "WOT", +} +var TopicDescriptor_EncOpts_EncMode_value = map[string]int32{ + "NONE": 0, + "SHAREDKEY": 1, + "WOT": 2, +} + +func (x TopicDescriptor_EncOpts_EncMode) Enum() *TopicDescriptor_EncOpts_EncMode { + p := new(TopicDescriptor_EncOpts_EncMode) + *p = x + return p +} +func (x TopicDescriptor_EncOpts_EncMode) String() string { + return proto.EnumName(TopicDescriptor_EncOpts_EncMode_name, int32(x)) +} +func (x *TopicDescriptor_EncOpts_EncMode) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(TopicDescriptor_EncOpts_EncMode_value, data, "TopicDescriptor_EncOpts_EncMode") + if err != nil { + return err + } + *x = TopicDescriptor_EncOpts_EncMode(value) + return nil +} + type RPC struct { - Type *string `protobuf:"bytes,1,opt,name=type" json:"type,omitempty"` - Topics []string `protobuf:"bytes,2,rep,name=topics" json:"topics,omitempty"` - Msg *Message `protobuf:"bytes,3,opt,name=msg" json:"msg,omitempty"` - XXX_unrecognized []byte `json:"-"` + Subscriptions []*RPC_SubOpts `protobuf:"bytes,1,rep,name=subscriptions" json:"subscriptions,omitempty"` + Publish []*Message `protobuf:"bytes,2,rep,name=publish" json:"publish,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *RPC) Reset() { *m = RPC{} } func (m *RPC) String() string { return proto.CompactTextString(m) } func (*RPC) ProtoMessage() {} -func (m *RPC) GetType() string { - if m != nil && m.Type != nil { - return *m.Type +func (m *RPC) GetSubscriptions() []*RPC_SubOpts { + if m != nil { + return m.Subscriptions } - return "" + return nil } -func (m *RPC) GetTopics() []string { +func (m *RPC) GetPublish() []*Message { if m != nil { - return m.Topics + return m.Publish } return nil } -func (m *RPC) GetMsg() *Message { - if m != nil { - return m.Msg +type RPC_SubOpts struct { + Subscribe *bool `protobuf:"varint,1,opt,name=subscribe" json:"subscribe,omitempty"` + Topicid *string `protobuf:"bytes,2,opt,name=topicid" json:"topicid,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *RPC_SubOpts) Reset() { *m = RPC_SubOpts{} } +func (m *RPC_SubOpts) String() string { return proto.CompactTextString(m) } +func (*RPC_SubOpts) ProtoMessage() {} + +func (m *RPC_SubOpts) GetSubscribe() bool { + if m != nil && m.Subscribe != nil { + return *m.Subscribe } - return nil + return false +} + +func (m *RPC_SubOpts) GetTopicid() string { + if m != nil && m.Topicid != nil { + return *m.Topicid + } + return "" } type Message struct { - From *string `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"` - Seqno *uint64 `protobuf:"varint,3,opt,name=seqno" json:"seqno,omitempty"` - Topic *string `protobuf:"bytes,4,opt,name=topic" json:"topic,omitempty"` - XXX_unrecognized []byte `json:"-"` + From *string `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"` + Seqno []byte `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"` + TopicIDs []string `protobuf:"bytes,4,rep,name=topicIDs" json:"topicIDs,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *Message) Reset() { *m = Message{} } @@ -81,21 +170,108 @@ func (m *Message) GetData() []byte { return nil } -func (m *Message) GetSeqno() uint64 { - if m != nil && m.Seqno != nil { - return *m.Seqno +func (m *Message) GetSeqno() []byte { + if m != nil { + return m.Seqno + } + return nil +} + +func (m *Message) GetTopicIDs() []string { + if m != nil { + return m.TopicIDs } - return 0 + return nil } -func (m *Message) GetTopic() string { - if m != nil && m.Topic != nil { - return *m.Topic +// topicID = hash(topicDescriptor); (not the topic.name) +type TopicDescriptor struct { + Name *string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` + Auth *TopicDescriptor_AuthOpts `protobuf:"bytes,2,opt,name=auth" json:"auth,omitempty"` + Enc *TopicDescriptor_EncOpts `protobuf:"bytes,3,opt,name=enc" json:"enc,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *TopicDescriptor) Reset() { *m = TopicDescriptor{} } +func (m *TopicDescriptor) String() string { return proto.CompactTextString(m) } +func (*TopicDescriptor) ProtoMessage() {} + +func (m *TopicDescriptor) GetName() string { + if m != nil && m.Name != nil { + return *m.Name } return "" } +func (m *TopicDescriptor) GetAuth() *TopicDescriptor_AuthOpts { + if m != nil { + return m.Auth + } + return nil +} + +func (m *TopicDescriptor) GetEnc() *TopicDescriptor_EncOpts { + if m != nil { + return m.Enc + } + return nil +} + +type TopicDescriptor_AuthOpts struct { + Mode *TopicDescriptor_AuthOpts_AuthMode `protobuf:"varint,1,opt,name=mode,enum=floodsub.pb.TopicDescriptor_AuthOpts_AuthMode" json:"mode,omitempty"` + Keys [][]byte `protobuf:"bytes,2,rep,name=keys" json:"keys,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *TopicDescriptor_AuthOpts) Reset() { *m = TopicDescriptor_AuthOpts{} } +func (m *TopicDescriptor_AuthOpts) String() string { return proto.CompactTextString(m) } +func (*TopicDescriptor_AuthOpts) ProtoMessage() {} + +func (m *TopicDescriptor_AuthOpts) GetMode() TopicDescriptor_AuthOpts_AuthMode { + if m != nil && m.Mode != nil { + return *m.Mode + } + return TopicDescriptor_AuthOpts_NONE +} + +func (m *TopicDescriptor_AuthOpts) GetKeys() [][]byte { + if m != nil { + return m.Keys + } + return nil +} + +type TopicDescriptor_EncOpts struct { + Mode *TopicDescriptor_EncOpts_EncMode `protobuf:"varint,1,opt,name=mode,enum=floodsub.pb.TopicDescriptor_EncOpts_EncMode" json:"mode,omitempty"` + KeyHashes [][]byte `protobuf:"bytes,2,rep,name=keyHashes" json:"keyHashes,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *TopicDescriptor_EncOpts) Reset() { *m = TopicDescriptor_EncOpts{} } +func (m *TopicDescriptor_EncOpts) String() string { return proto.CompactTextString(m) } +func (*TopicDescriptor_EncOpts) ProtoMessage() {} + +func (m *TopicDescriptor_EncOpts) GetMode() TopicDescriptor_EncOpts_EncMode { + if m != nil && m.Mode != nil { + return *m.Mode + } + return TopicDescriptor_EncOpts_NONE +} + +func (m *TopicDescriptor_EncOpts) GetKeyHashes() [][]byte { + if m != nil { + return m.KeyHashes + } + return nil +} + func init() { proto.RegisterType((*RPC)(nil), "floodsub.pb.RPC") + proto.RegisterType((*RPC_SubOpts)(nil), "floodsub.pb.RPC.SubOpts") proto.RegisterType((*Message)(nil), "floodsub.pb.Message") + proto.RegisterType((*TopicDescriptor)(nil), "floodsub.pb.TopicDescriptor") + proto.RegisterType((*TopicDescriptor_AuthOpts)(nil), "floodsub.pb.TopicDescriptor.AuthOpts") + proto.RegisterType((*TopicDescriptor_EncOpts)(nil), "floodsub.pb.TopicDescriptor.EncOpts") + proto.RegisterEnum("floodsub.pb.TopicDescriptor_AuthOpts_AuthMode", TopicDescriptor_AuthOpts_AuthMode_name, TopicDescriptor_AuthOpts_AuthMode_value) + proto.RegisterEnum("floodsub.pb.TopicDescriptor_EncOpts_EncMode", TopicDescriptor_EncOpts_EncMode_name, TopicDescriptor_EncOpts_EncMode_value) } diff --git a/pb/rpc.proto b/pb/rpc.proto index 2b49ec0..22f4ca1 100644 --- a/pb/rpc.proto +++ b/pb/rpc.proto @@ -1,20 +1,47 @@ package floodsub.pb; message RPC { - optional string type = 1; + repeated SubOpts subscriptions = 1; + repeated Message publish = 2; - repeated string topics = 2; - - optional Message msg = 3; + message SubOpts { + optional bool subscribe = 1; // subscribe or unsubcribe + optional string topicid = 2; + } } message Message { optional string from = 1; - optional bytes data = 2; - - optional uint64 seqno = 3; - - optional string topic = 4; + optional bytes seqno = 3; + repeated string topicIDs = 4; } +// topicID = hash(topicDescriptor); (not the topic.name) +message TopicDescriptor { + optional string name = 1; + optional AuthOpts auth = 2; + optional EncOpts enc = 3; + + message AuthOpts { + optional AuthMode mode = 1; + repeated bytes keys = 2; // root keys to trust + + enum AuthMode { + NONE = 0; // no authentication, anyone can publish + KEY = 1; // only messages signed by keys in the topic descriptor are accepted + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } + + message EncOpts { + optional EncMode mode = 1; + repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted) + + enum EncMode { + NONE = 0; // no encryption, anyone can read + SHAREDKEY = 1; // messages are encrypted with shared key + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } +} -- GitLab