Commit 6ed3d545 authored by Jeromy's avatar Jeromy

switch over to protobuf rpc

parent 13a46da4
package floodsub
import (
"bufio"
"encoding/base64"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
peer "github.com/ipfs/go-libp2p-peer"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p/p2p/host"
inet "github.com/libp2p/go-libp2p/p2p/net"
protocol "github.com/libp2p/go-libp2p/p2p/protocol"
pb "github.com/whyrusleeping/go-floodsub/pb"
ggio "github.com/gogo/protobuf/io"
proto "github.com/gogo/protobuf/proto"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
peer "gx/ipfs/QmWtbQU15LaB5B1JC2F7TV9P4K88vD3PpA4AJrwfCjhML8/go-libp2p-peer"
host "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/host"
inet "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/net"
protocol "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/protocol"
)
var messageCount uint64
const ID = protocol.ID("/floodsub/1.0.0")
const (
var (
AddSubMessageType = "sub"
UnsubMessageType = "unsub"
PubMessageType = "pub"
......@@ -44,49 +48,15 @@ type PubSub struct {
}
type Message struct {
From peer.ID
Data []byte
Timestamp uint64
Topic string
}
func (m *Message) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{
"from": base64.RawStdEncoding.EncodeToString([]byte(m.From)),
"data": m.Data,
"timestamp": m.Timestamp,
"topic": m.Topic,
})
*pb.Message
}
func (m *Message) UnmarshalJSON(data []byte) error {
mp := struct {
Data []byte
Timestamp uint64
Topic string
From string
}{}
err := json.Unmarshal(data, &mp)
if err != nil {
return err
}
pid, err := base64.RawStdEncoding.DecodeString(mp.From)
if err != nil {
return err
}
m.Data = mp.Data
m.Timestamp = mp.Timestamp
m.Topic = mp.Topic
m.From = peer.ID(pid)
return nil
func (m *Message) GetFrom() peer.ID {
return peer.ID(m.Message.GetFrom())
}
type RPC struct {
Type string
Msg *Message
Topics []string
pb.RPC
// unexported on purpose, not sending this over the wire
from peer.ID
......@@ -98,16 +68,15 @@ func NewFloodSub(h host.Host) *PubSub {
incoming: make(chan *RPC, 32),
outgoing: make(chan *RPC),
newPeers: make(chan inet.Stream),
peerDead: make(chan peer.ID),
addSub: make(chan *addSub),
myTopics: make(map[string]chan *Message),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
lastMsg: make(map[peer.ID]uint64),
peerDead: make(chan peer.ID),
addSub: make(chan *addSub),
}
h.SetStreamHandler(ID, ps.handleNewStream)
h.Network().Notify(ps)
go ps.processLoop()
......@@ -120,21 +89,19 @@ func (p *PubSub) getHelloPacket() *RPC {
for t, _ := range p.myTopics {
rpc.Topics = append(rpc.Topics, t)
}
rpc.Type = AddSubMessageType
rpc.Type = &AddSubMessageType
return &rpc
}
func (p *PubSub) handleNewStream(s inet.Stream) {
defer s.Close()
scan := bufio.NewScanner(s)
for scan.Scan() {
r := ggio.NewDelimitedReader(s, 1<<20)
for {
rpc := new(RPC)
err := json.Unmarshal(scan.Bytes(), rpc)
err := r.ReadMsg(&rpc.RPC)
if err != nil {
log.Errorf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
log.Error("data: ", scan.Text())
// TODO: cleanup of some sort
return
}
......@@ -146,12 +113,16 @@ func (p *PubSub) handleNewStream(s inet.Stream) {
func (p *PubSub) handleSendingMessages(s inet.Stream, in <-chan *RPC) {
var dead bool
wc := ggio.NewDelimitedWriter(s)
defer wc.Close()
for rpc := range in {
if dead {
continue
}
err := writeRPC(s, rpc)
atomic.AddUint64(&messageCount, 1)
err := wc.WriteMsg(&rpc.RPC)
if err != nil {
log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err)
dead = true
......@@ -192,7 +163,7 @@ func (p *PubSub) processLoop() {
log.Error("handling RPC: ", err)
}
case rpc := <-p.outgoing:
switch rpc.Type {
switch rpc.GetType() {
case AddSubMessageType, UnsubMessageType:
for _, mch := range p.peers {
mch <- rpc
......@@ -215,7 +186,9 @@ func (p *PubSub) processLoop() {
func (p *PubSub) handleSubscriptionChange(sub *addSub) {
ch, ok := p.myTopics[sub.topic]
out := &RPC{
Topics: []string{sub.topic},
RPC: pb.RPC{
Topics: []string{sub.topic},
},
}
if sub.cancel {
......@@ -225,7 +198,7 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) {
close(ch)
delete(p.myTopics, sub.topic)
out.Type = UnsubMessageType
out.Type = &UnsubMessageType
} else {
if ok {
// we don't allow multiple subs per topic at this point
......@@ -236,7 +209,7 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) {
resp := make(chan *Message, 16)
p.myTopics[sub.topic] = resp
sub.resp <- resp
out.Type = AddSubMessageType
out.Type = &AddSubMessageType
}
go func() {
......@@ -245,16 +218,16 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) {
}
func (p *PubSub) recvMessage(rpc *RPC) error {
subch, ok := p.myTopics[rpc.Msg.Topic]
subch, ok := p.myTopics[rpc.Msg.GetTopic()]
if ok {
//fmt.Println("writing out to subscriber!")
subch <- rpc.Msg
subch <- &Message{rpc.Msg}
}
return nil
}
func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
switch rpc.Type {
switch rpc.GetType() {
case AddSubMessageType:
for _, t := range rpc.Topics {
tmap, ok := p.topics[t]
......@@ -278,19 +251,22 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
return fmt.Errorf("nil pub message")
}
msg := &Message{rpc.Msg}
// Note: Obviously this is an incredibly insecure way of
// filtering out "messages we've already seen". But it works for a
// cool demo, so i'm not gonna waste time thinking about it any more
if p.lastMsg[rpc.Msg.From] >= rpc.Msg.Timestamp {
if p.lastMsg[msg.GetFrom()] >= msg.GetSeqno() {
//log.Error("skipping 'old' message")
return nil
}
if rpc.Msg.From == p.host.ID() {
if msg.GetFrom() == p.host.ID() {
log.Error("skipping message from self")
return nil
}
p.lastMsg[rpc.Msg.From] = rpc.Msg.Timestamp
p.lastMsg[msg.GetFrom()] = msg.GetSeqno()
if err := p.recvMessage(rpc); err != nil {
log.Error("error receiving message: ", err)
......@@ -305,13 +281,13 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error {
}
func (p *PubSub) publishMessage(rpc *RPC) error {
tmap, ok := p.topics[rpc.Msg.Topic]
tmap, ok := p.topics[rpc.Msg.GetTopic()]
if !ok {
return nil
}
for pid, _ := range tmap {
if pid == rpc.from || pid == rpc.Msg.From {
if pid == rpc.from || pid == peer.ID(rpc.Msg.GetFrom()) {
continue
}
......@@ -355,25 +331,17 @@ func (p *PubSub) Unsub(topic string) {
}
func (p *PubSub) Publish(topic string, data []byte) error {
seqno := uint64(time.Now().UnixNano())
p.outgoing <- &RPC{
Msg: &Message{
Data: data,
Topic: topic,
From: p.host.ID(),
Timestamp: uint64(time.Now().UnixNano()),
RPC: pb.RPC{
Msg: &pb.Message{
Data: data,
Topic: &topic,
From: proto.String(string(p.host.ID())),
Seqno: &seqno,
},
Type: &PubMessageType,
},
Type: PubMessageType,
}
return nil
}
func writeRPC(s inet.Stream, rpc *RPC) error {
data, err := json.Marshal(rpc)
if err != nil {
return err
}
data = append(data, '\n')
_, err = s.Write(data)
return err
}
......@@ -8,8 +8,8 @@ import (
"testing"
"time"
host "github.com/libp2p/go-libp2p/p2p/host"
netutil "github.com/libp2p/go-libp2p/p2p/test/util"
host "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/host"
netutil "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/test/util"
)
func getNetHosts(t *testing.T, n int) []host.Host {
......@@ -94,4 +94,6 @@ func TestBasicFloodsub(t *testing.T) {
}
}
}
fmt.Println("Total Sent Messages: ", messageCount)
}
......@@ -3,8 +3,8 @@ package floodsub
import (
"context"
ma "github.com/jbenet/go-multiaddr"
inet "github.com/libp2p/go-libp2p/p2p/net"
ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
inet "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/net"
)
var _ inet.Notifiee = (*PubSub)(nil)
......
// Code generated by protoc-gen-gogo.
// source: rpc.proto
// DO NOT EDIT!
/*
Package floodsub_pb is a generated protocol buffer package.
It is generated from these files:
rpc.proto
It has these top-level messages:
RPC
Message
*/
package floodsub_pb
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type RPC struct {
Type *string `protobuf:"bytes,1,opt,name=type" json:"type,omitempty"`
Topics []string `protobuf:"bytes,2,rep,name=topics" json:"topics,omitempty"`
Msg *Message `protobuf:"bytes,3,opt,name=msg" json:"msg,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *RPC) Reset() { *m = RPC{} }
func (m *RPC) String() string { return proto.CompactTextString(m) }
func (*RPC) ProtoMessage() {}
func (*RPC) Descriptor() ([]byte, []int) { return fileDescriptorRpc, []int{0} }
func (m *RPC) GetType() string {
if m != nil && m.Type != nil {
return *m.Type
}
return ""
}
func (m *RPC) GetTopics() []string {
if m != nil {
return m.Topics
}
return nil
}
func (m *RPC) GetMsg() *Message {
if m != nil {
return m.Msg
}
return nil
}
type Message struct {
From *string `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
Seqno *uint64 `protobuf:"varint,3,opt,name=seqno" json:"seqno,omitempty"`
Topic *string `protobuf:"bytes,4,opt,name=topic" json:"topic,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) { return fileDescriptorRpc, []int{1} }
func (m *Message) GetFrom() string {
if m != nil && m.From != nil {
return *m.From
}
return ""
}
func (m *Message) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func (m *Message) GetSeqno() uint64 {
if m != nil && m.Seqno != nil {
return *m.Seqno
}
return 0
}
func (m *Message) GetTopic() string {
if m != nil && m.Topic != nil {
return *m.Topic
}
return ""
}
func init() {
proto.RegisterType((*RPC)(nil), "floodsub.pb.RPC")
proto.RegisterType((*Message)(nil), "floodsub.pb.Message")
}
func init() { proto.RegisterFile("rpc.proto", fileDescriptorRpc) }
var fileDescriptorRpc = []byte{
// 158 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2c, 0x2a, 0x48, 0xd6,
0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4e, 0xcb, 0xc9, 0xcf, 0x4f, 0x29, 0x2e, 0x4d, 0xd2,
0x2b, 0x48, 0x52, 0x72, 0xe3, 0x62, 0x0e, 0x0a, 0x70, 0x16, 0xe2, 0xe1, 0x62, 0x29, 0xa9, 0x2c,
0x48, 0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x14, 0xe2, 0xe3, 0x62, 0x2b, 0xc9, 0x2f, 0xc8, 0x4c,
0x2e, 0x96, 0x60, 0x52, 0x60, 0xd6, 0xe0, 0x14, 0x52, 0xe4, 0x62, 0xce, 0x2d, 0x4e, 0x97, 0x60,
0x56, 0x60, 0xd4, 0xe0, 0x36, 0x12, 0xd1, 0x43, 0xd2, 0xaf, 0xe7, 0x9b, 0x5a, 0x5c, 0x9c, 0x98,
0x9e, 0xaa, 0xe4, 0xcc, 0xc5, 0x0e, 0x65, 0x82, 0xcc, 0x4a, 0x2b, 0xca, 0xcf, 0x85, 0x9a, 0xc5,
0xc3, 0xc5, 0x92, 0x92, 0x58, 0x92, 0x28, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x23, 0xc4, 0xcb, 0xc5,
0x5a, 0x9c, 0x5a, 0x98, 0x97, 0x0f, 0x36, 0x8b, 0x05, 0xc4, 0x05, 0x5b, 0x24, 0xc1, 0x02, 0x52,
0x0b, 0x08, 0x00, 0x00, 0xff, 0xff, 0x08, 0xfe, 0xc0, 0xc1, 0xa5, 0x00, 0x00, 0x00,
}
package floodsub.pb;
message RPC {
optional string type = 1;
repeated string topics = 2;
optional Message msg = 3;
}
message Message {
optional string from = 1;
optional bytes data = 2;
optional uint64 seqno = 3;
optional string topic = 4;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment