From c0d5b0ef2691ef3d348d63e4f0ca839b209b21e0 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 10 Sep 2016 08:28:29 -0700 Subject: [PATCH] using protobuf now, with a vengeance --- .gx/lastpubver | 2 +- floodsub.go | 28 +++++++++++++++++----------- floodsub_test.go | 6 ++---- notify.go | 4 ++-- package.json | 8 +++++++- pb/rpc.pb.go | 36 ++++++------------------------------ 6 files changed, 35 insertions(+), 49 deletions(-) diff --git a/.gx/lastpubver b/.gx/lastpubver index 2f3ec3d..f6653d5 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -0.1.0: QmUvA4goPKAHLrTcePzzCDHqCpqtzdKAVdMPdrdw45Z2FM +0.2.0: QmTP5RSsmZacEFanuBhnDE3RC3aqGAoozF9ZmKqcHa8EKH diff --git a/floodsub.go b/floodsub.go index 7a00129..ab2835e 100644 --- a/floodsub.go +++ b/floodsub.go @@ -1,24 +1,22 @@ package floodsub import ( + "bufio" "fmt" "sync" - "sync/atomic" "time" pb "github.com/whyrusleeping/go-floodsub/pb" ggio "github.com/gogo/protobuf/io" proto "github.com/gogo/protobuf/proto" - logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" - peer "gx/ipfs/QmWtbQU15LaB5B1JC2F7TV9P4K88vD3PpA4AJrwfCjhML8/go-libp2p-peer" - host "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/host" - inet "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/net" - protocol "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/protocol" + peer "github.com/ipfs/go-libp2p-peer" + logging "github.com/ipfs/go-log" + host "github.com/libp2p/go-libp2p/p2p/host" + inet "github.com/libp2p/go-libp2p/p2p/net" + protocol "github.com/libp2p/go-libp2p/p2p/protocol" ) -var messageCount uint64 - const ID = protocol.ID("/floodsub/1.0.0") var ( @@ -113,15 +111,14 @@ func (p *PubSub) handleNewStream(s inet.Stream) { func (p *PubSub) handleSendingMessages(s inet.Stream, in <-chan *RPC) { var dead bool - wc := ggio.NewDelimitedWriter(s) + bufw := bufio.NewWriter(s) + wc := ggio.NewDelimitedWriter(bufw) defer wc.Close() for rpc := range in { if dead { continue } - atomic.AddUint64(&messageCount, 1) - err := wc.WriteMsg(&rpc.RPC) if err != nil { log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err) @@ -130,6 +127,15 @@ func (p *PubSub) handleSendingMessages(s inet.Stream, in <-chan *RPC) { p.peerDead <- s.Conn().RemotePeer() }() } + + err = bufw.Flush() + if err != nil { + log.Errorf("writing message to %s: %s", s.Conn().RemotePeer(), err) + dead = true + go func() { + p.peerDead <- s.Conn().RemotePeer() + }() + } } } diff --git a/floodsub_test.go b/floodsub_test.go index d67b7cc..4f45aa5 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -8,8 +8,8 @@ import ( "testing" "time" - host "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/host" - netutil "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/test/util" + host "github.com/libp2p/go-libp2p/p2p/host" + netutil "github.com/libp2p/go-libp2p/p2p/test/util" ) func getNetHosts(t *testing.T, n int) []host.Host { @@ -94,6 +94,4 @@ func TestBasicFloodsub(t *testing.T) { } } } - - fmt.Println("Total Sent Messages: ", messageCount) } diff --git a/notify.go b/notify.go index 6bcaab6..60d9b59 100644 --- a/notify.go +++ b/notify.go @@ -3,8 +3,8 @@ package floodsub import ( "context" - ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr" - inet "gx/ipfs/Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS/go-libp2p/p2p/net" + ma "github.com/jbenet/go-multiaddr" + inet "github.com/libp2p/go-libp2p/p2p/net" ) var _ inet.Notifiee = (*PubSub)(nil) diff --git a/package.json b/package.json index 0a5ae2d..d838a90 100644 --- a/package.json +++ b/package.json @@ -12,11 +12,17 @@ "hash": "Qmf4ETeAWXuThBfWwonVyFqGFSgTWepUDEr1txcctvpTXS", "name": "go-libp2p", "version": "3.4.1" + }, + { + "author": "whyrusleeping", + "hash": "QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV", + "name": "gogo-protobuf", + "version": "0.0.0" } ], "gxVersion": "0.9.0", "language": "go", "license": "", "name": "floodsub", - "version": "0.1.0" + "version": "0.2.0" } diff --git a/pb/rpc.pb.go b/pb/rpc.pb.go index 9e23ebc..56dd89a 100644 --- a/pb/rpc.pb.go +++ b/pb/rpc.pb.go @@ -23,12 +23,6 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package - type RPC struct { Type *string `protobuf:"bytes,1,opt,name=type" json:"type,omitempty"` Topics []string `protobuf:"bytes,2,rep,name=topics" json:"topics,omitempty"` @@ -36,10 +30,9 @@ type RPC struct { XXX_unrecognized []byte `json:"-"` } -func (m *RPC) Reset() { *m = RPC{} } -func (m *RPC) String() string { return proto.CompactTextString(m) } -func (*RPC) ProtoMessage() {} -func (*RPC) Descriptor() ([]byte, []int) { return fileDescriptorRpc, []int{0} } +func (m *RPC) Reset() { *m = RPC{} } +func (m *RPC) String() string { return proto.CompactTextString(m) } +func (*RPC) ProtoMessage() {} func (m *RPC) GetType() string { if m != nil && m.Type != nil { @@ -70,10 +63,9 @@ type Message struct { XXX_unrecognized []byte `json:"-"` } -func (m *Message) Reset() { *m = Message{} } -func (m *Message) String() string { return proto.CompactTextString(m) } -func (*Message) ProtoMessage() {} -func (*Message) Descriptor() ([]byte, []int) { return fileDescriptorRpc, []int{1} } +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} func (m *Message) GetFrom() string { if m != nil && m.From != nil { @@ -107,19 +99,3 @@ func init() { proto.RegisterType((*RPC)(nil), "floodsub.pb.RPC") proto.RegisterType((*Message)(nil), "floodsub.pb.Message") } - -func init() { proto.RegisterFile("rpc.proto", fileDescriptorRpc) } - -var fileDescriptorRpc = []byte{ - // 158 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2c, 0x2a, 0x48, 0xd6, - 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4e, 0xcb, 0xc9, 0xcf, 0x4f, 0x29, 0x2e, 0x4d, 0xd2, - 0x2b, 0x48, 0x52, 0x72, 0xe3, 0x62, 0x0e, 0x0a, 0x70, 0x16, 0xe2, 0xe1, 0x62, 0x29, 0xa9, 0x2c, - 0x48, 0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x14, 0xe2, 0xe3, 0x62, 0x2b, 0xc9, 0x2f, 0xc8, 0x4c, - 0x2e, 0x96, 0x60, 0x52, 0x60, 0xd6, 0xe0, 0x14, 0x52, 0xe4, 0x62, 0xce, 0x2d, 0x4e, 0x97, 0x60, - 0x56, 0x60, 0xd4, 0xe0, 0x36, 0x12, 0xd1, 0x43, 0xd2, 0xaf, 0xe7, 0x9b, 0x5a, 0x5c, 0x9c, 0x98, - 0x9e, 0xaa, 0xe4, 0xcc, 0xc5, 0x0e, 0x65, 0x82, 0xcc, 0x4a, 0x2b, 0xca, 0xcf, 0x85, 0x9a, 0xc5, - 0xc3, 0xc5, 0x92, 0x92, 0x58, 0x92, 0x28, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x23, 0xc4, 0xcb, 0xc5, - 0x5a, 0x9c, 0x5a, 0x98, 0x97, 0x0f, 0x36, 0x8b, 0x05, 0xc4, 0x05, 0x5b, 0x24, 0xc1, 0x02, 0x52, - 0x0b, 0x08, 0x00, 0x00, 0xff, 0xff, 0x08, 0xfe, 0xc0, 0xc1, 0xa5, 0x00, 0x00, 0x00, -} -- GitLab