diff --git a/floodsub.go b/floodsub.go index 63efb529b2f22306e99023d44b73723745e71d47..7a00129afad087ae2f8a4cf37153b115ed28640f 100644 --- a/floodsub.go +++ b/floodsub.go @@ -1,23 +1,27 @@ package floodsub import ( - "bufio" - "encoding/base64" - "encoding/json" "fmt" "sync" + "sync/atomic" "time" - peer "github.com/ipfs/go-libp2p-peer" - logging "github.com/ipfs/go-log" - host "github.com/libp2p/go-libp2p/p2p/host" - inet "github.com/libp2p/go-libp2p/p2p/net" - protocol "github.com/libp2p/go-libp2p/p2p/protocol" + pb "github.com/whyrusleeping/go-floodsub/pb" + + ggio "github.com/gogo/protobuf/io" + proto "github.com/gogo/protobuf/proto" + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + peer "gx/ipfs/QmWtbQU15LaB5B1JC2F7TV9P4K88vD3PpA4AJrwfCjhML8/go-libp2p-peer" + host "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/host" + inet "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/net" + protocol "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/protocol" ) +var messageCount uint64 + const ID = protocol.ID("/floodsub/1.0.0") -const ( +var ( AddSubMessageType = "sub" UnsubMessageType = "unsub" PubMessageType = "pub" @@ -44,49 +48,15 @@ type PubSub struct { } type Message struct { - From peer.ID - Data []byte - Timestamp uint64 - Topic string -} - -func (m *Message) MarshalJSON() ([]byte, error) { - return json.Marshal(map[string]interface{}{ - "from": base64.RawStdEncoding.EncodeToString([]byte(m.From)), - "data": m.Data, - "timestamp": m.Timestamp, - "topic": m.Topic, - }) + *pb.Message } -func (m *Message) UnmarshalJSON(data []byte) error { - mp := struct { - Data []byte - Timestamp uint64 - Topic string - From string - }{} - err := json.Unmarshal(data, &mp) - if err != nil { - return err - } - - pid, err := base64.RawStdEncoding.DecodeString(mp.From) - if err != nil { - return err - } - - m.Data = mp.Data - m.Timestamp = mp.Timestamp - m.Topic = mp.Topic - m.From = peer.ID(pid) - return nil +func (m *Message) GetFrom() peer.ID { + return peer.ID(m.Message.GetFrom()) } type RPC struct { - Type string - Msg *Message - Topics []string + pb.RPC // unexported on purpose, not sending this over the wire from peer.ID @@ -98,16 +68,15 @@ func NewFloodSub(h host.Host) *PubSub { incoming: make(chan *RPC, 32), outgoing: make(chan *RPC), newPeers: make(chan inet.Stream), + peerDead: make(chan peer.ID), + addSub: make(chan *addSub), myTopics: make(map[string]chan *Message), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), lastMsg: make(map[peer.ID]uint64), - peerDead: make(chan peer.ID), - addSub: make(chan *addSub), } h.SetStreamHandler(ID, ps.handleNewStream) - h.Network().Notify(ps) go ps.processLoop() @@ -120,21 +89,19 @@ func (p *PubSub) getHelloPacket() *RPC { for t, _ := range p.myTopics { rpc.Topics = append(rpc.Topics, t) } - rpc.Type = AddSubMessageType + rpc.Type = &AddSubMessageType return &rpc } func (p *PubSub) handleNewStream(s inet.Stream) { defer s.Close() - scan := bufio.NewScanner(s) - for scan.Scan() { + r := ggio.NewDelimitedReader(s, 1<<20) + for { rpc := new(RPC) - - err := json.Unmarshal(scan.Bytes(), rpc) + err := r.ReadMsg(&rpc.RPC) if err != nil { log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err) - log.Error("data: ", scan.Text()) // TODO: cleanup of some sort return } @@ -146,12 +113,16 @@ func (p *PubSub) handleNewStream(s inet.Stream) { func (p *PubSub) handleSendingMessages(s inet.Stream, in <-chan *RPC) { var dead bool + wc := ggio.NewDelimitedWriter(s) + defer wc.Close() for rpc := range in { if dead { continue } - err := writeRPC(s, rpc) + atomic.AddUint64(&messageCount, 1) + + err := wc.WriteMsg(&rpc.RPC) if err != nil { log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err) dead = true @@ -192,7 +163,7 @@ func (p *PubSub) processLoop() { log.Error("handling RPC: ", err) } case rpc := <-p.outgoing: - switch rpc.Type { + switch rpc.GetType() { case AddSubMessageType, UnsubMessageType: for _, mch := range p.peers { mch <- rpc @@ -215,7 +186,9 @@ func (p *PubSub) processLoop() { func (p *PubSub) handleSubscriptionChange(sub *addSub) { ch, ok := p.myTopics[sub.topic] out := &RPC{ - Topics: []string{sub.topic}, + RPC: pb.RPC{ + Topics: []string{sub.topic}, + }, } if sub.cancel { @@ -225,7 +198,7 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) { close(ch) delete(p.myTopics, sub.topic) - out.Type = UnsubMessageType + out.Type = &UnsubMessageType } else { if ok { // we don't allow multiple subs per topic at this point @@ -236,7 +209,7 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) { resp := make(chan *Message, 16) p.myTopics[sub.topic] = resp sub.resp <- resp - out.Type = AddSubMessageType + out.Type = &AddSubMessageType } go func() { @@ -245,16 +218,16 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) { } func (p *PubSub) recvMessage(rpc *RPC) error { - subch, ok := p.myTopics[rpc.Msg.Topic] + subch, ok := p.myTopics[rpc.Msg.GetTopic()] if ok { //fmt.Println("writing out to subscriber!") - subch <- rpc.Msg + subch <- &Message{rpc.Msg} } return nil } func (p *PubSub) handleIncomingRPC(rpc *RPC) error { - switch rpc.Type { + switch rpc.GetType() { case AddSubMessageType: for _, t := range rpc.Topics { tmap, ok := p.topics[t] @@ -278,19 +251,22 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { return fmt.Errorf("nil pub message") } + msg := &Message{rpc.Msg} + // Note: Obviously this is an incredibly insecure way of // filtering out "messages we've already seen". But it works for a // cool demo, so i'm not gonna waste time thinking about it any more - if p.lastMsg[rpc.Msg.From] >= rpc.Msg.Timestamp { + if p.lastMsg[msg.GetFrom()] >= msg.GetSeqno() { //log.Error("skipping 'old' message") return nil } - if rpc.Msg.From == p.host.ID() { + if msg.GetFrom() == p.host.ID() { + log.Error("skipping message from self") return nil } - p.lastMsg[rpc.Msg.From] = rpc.Msg.Timestamp + p.lastMsg[msg.GetFrom()] = msg.GetSeqno() if err := p.recvMessage(rpc); err != nil { log.Error("error receiving message: ", err) @@ -305,13 +281,13 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { } func (p *PubSub) publishMessage(rpc *RPC) error { - tmap, ok := p.topics[rpc.Msg.Topic] + tmap, ok := p.topics[rpc.Msg.GetTopic()] if !ok { return nil } for pid, _ := range tmap { - if pid == rpc.from || pid == rpc.Msg.From { + if pid == rpc.from || pid == peer.ID(rpc.Msg.GetFrom()) { continue } @@ -355,25 +331,17 @@ func (p *PubSub) Unsub(topic string) { } func (p *PubSub) Publish(topic string, data []byte) error { + seqno := uint64(time.Now().UnixNano()) p.outgoing <- &RPC{ - Msg: &Message{ - Data: data, - Topic: topic, - From: p.host.ID(), - Timestamp: uint64(time.Now().UnixNano()), + RPC: pb.RPC{ + Msg: &pb.Message{ + Data: data, + Topic: &topic, + From: proto.String(string(p.host.ID())), + Seqno: &seqno, + }, + Type: &PubMessageType, }, - Type: PubMessageType, } return nil } - -func writeRPC(s inet.Stream, rpc *RPC) error { - data, err := json.Marshal(rpc) - if err != nil { - return err - } - - data = append(data, '\n') - _, err = s.Write(data) - return err -} diff --git a/floodsub_test.go b/floodsub_test.go index 4f45aa597a9060b59f9afec16a94449ffd55b595..d67b7cc497ba884086978b3ded9df35ec5985a14 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -8,8 +8,8 @@ import ( "testing" "time" - host "github.com/libp2p/go-libp2p/p2p/host" - netutil "github.com/libp2p/go-libp2p/p2p/test/util" + host "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/host" + netutil "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/test/util" ) func getNetHosts(t *testing.T, n int) []host.Host { @@ -94,4 +94,6 @@ func TestBasicFloodsub(t *testing.T) { } } } + + fmt.Println("Total Sent Messages: ", messageCount) } diff --git a/notify.go b/notify.go index 60d9b59546f6dca83709e97da08771dc9c153467..6bcaab6cecf5d927434fe4325958437963c17815 100644 --- a/notify.go +++ b/notify.go @@ -3,8 +3,8 @@ package floodsub import ( "context" - ma "github.com/jbenet/go-multiaddr" - inet "github.com/libp2p/go-libp2p/p2p/net" + ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr" + inet "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/net" ) var _ inet.Notifiee = (*PubSub)(nil) diff --git a/pb/rpc.pb.go b/pb/rpc.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..9e23ebcee446afbe1963d0ad7a60da09885fd46e --- /dev/null +++ b/pb/rpc.pb.go @@ -0,0 +1,125 @@ +// Code generated by protoc-gen-gogo. +// source: rpc.proto +// DO NOT EDIT! + +/* +Package floodsub_pb is a generated protocol buffer package. + +It is generated from these files: + rpc.proto + +It has these top-level messages: + RPC + Message +*/ +package floodsub_pb + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type RPC struct { + Type *string `protobuf:"bytes,1,opt,name=type" json:"type,omitempty"` + Topics []string `protobuf:"bytes,2,rep,name=topics" json:"topics,omitempty"` + Msg *Message `protobuf:"bytes,3,opt,name=msg" json:"msg,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *RPC) Reset() { *m = RPC{} } +func (m *RPC) String() string { return proto.CompactTextString(m) } +func (*RPC) ProtoMessage() {} +func (*RPC) Descriptor() ([]byte, []int) { return fileDescriptorRpc, []int{0} } + +func (m *RPC) GetType() string { + if m != nil && m.Type != nil { + return *m.Type + } + return "" +} + +func (m *RPC) GetTopics() []string { + if m != nil { + return m.Topics + } + return nil +} + +func (m *RPC) GetMsg() *Message { + if m != nil { + return m.Msg + } + return nil +} + +type Message struct { + From *string `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"` + Seqno *uint64 `protobuf:"varint,3,opt,name=seqno" json:"seqno,omitempty"` + Topic *string `protobuf:"bytes,4,opt,name=topic" json:"topic,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { return fileDescriptorRpc, []int{1} } + +func (m *Message) GetFrom() string { + if m != nil && m.From != nil { + return *m.From + } + return "" +} + +func (m *Message) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *Message) GetSeqno() uint64 { + if m != nil && m.Seqno != nil { + return *m.Seqno + } + return 0 +} + +func (m *Message) GetTopic() string { + if m != nil && m.Topic != nil { + return *m.Topic + } + return "" +} + +func init() { + proto.RegisterType((*RPC)(nil), "floodsub.pb.RPC") + proto.RegisterType((*Message)(nil), "floodsub.pb.Message") +} + +func init() { proto.RegisterFile("rpc.proto", fileDescriptorRpc) } + +var fileDescriptorRpc = []byte{ + // 158 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2c, 0x2a, 0x48, 0xd6, + 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4e, 0xcb, 0xc9, 0xcf, 0x4f, 0x29, 0x2e, 0x4d, 0xd2, + 0x2b, 0x48, 0x52, 0x72, 0xe3, 0x62, 0x0e, 0x0a, 0x70, 0x16, 0xe2, 0xe1, 0x62, 0x29, 0xa9, 0x2c, + 0x48, 0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x14, 0xe2, 0xe3, 0x62, 0x2b, 0xc9, 0x2f, 0xc8, 0x4c, + 0x2e, 0x96, 0x60, 0x52, 0x60, 0xd6, 0xe0, 0x14, 0x52, 0xe4, 0x62, 0xce, 0x2d, 0x4e, 0x97, 0x60, + 0x56, 0x60, 0xd4, 0xe0, 0x36, 0x12, 0xd1, 0x43, 0xd2, 0xaf, 0xe7, 0x9b, 0x5a, 0x5c, 0x9c, 0x98, + 0x9e, 0xaa, 0xe4, 0xcc, 0xc5, 0x0e, 0x65, 0x82, 0xcc, 0x4a, 0x2b, 0xca, 0xcf, 0x85, 0x9a, 0xc5, + 0xc3, 0xc5, 0x92, 0x92, 0x58, 0x92, 0x28, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x23, 0xc4, 0xcb, 0xc5, + 0x5a, 0x9c, 0x5a, 0x98, 0x97, 0x0f, 0x36, 0x8b, 0x05, 0xc4, 0x05, 0x5b, 0x24, 0xc1, 0x02, 0x52, + 0x0b, 0x08, 0x00, 0x00, 0xff, 0xff, 0x08, 0xfe, 0xc0, 0xc1, 0xa5, 0x00, 0x00, 0x00, +} diff --git a/pb/rpc.proto b/pb/rpc.proto new file mode 100644 index 0000000000000000000000000000000000000000..2b49ec00452312ba315ac6cc0a9379ab443af688 --- /dev/null +++ b/pb/rpc.proto @@ -0,0 +1,20 @@ +package floodsub.pb; + +message RPC { + optional string type = 1; + + repeated string topics = 2; + + optional Message msg = 3; +} + +message Message { + optional string from = 1; + + optional bytes data = 2; + + optional uint64 seqno = 3; + + optional string topic = 4; +} +