Commit ae55bf96 authored by Raúl Kripalani's avatar Raúl Kripalani Committed by vyzo

upgrade deps + interoperable uvarint delimited writer/reader.

parent 4ccb6382
...@@ -9,10 +9,11 @@ import ( ...@@ -9,10 +9,11 @@ import (
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
ggio "github.com/gogo/protobuf/io"
proto "github.com/gogo/protobuf/proto"
pb "github.com/libp2p/go-libp2p-pubsub/pb" pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-msgio/protoio"
"github.com/gogo/protobuf/proto"
ms "github.com/multiformats/go-multistream" ms "github.com/multiformats/go-multistream"
) )
...@@ -41,7 +42,7 @@ func (p *PubSub) getHelloPacket() *RPC { ...@@ -41,7 +42,7 @@ func (p *PubSub) getHelloPacket() *RPC {
} }
func (p *PubSub) handleNewStream(s network.Stream) { func (p *PubSub) handleNewStream(s network.Stream) {
r := ggio.NewDelimitedReader(s, p.maxMessageSize) r := protoio.NewDelimitedReader(s, p.maxMessageSize)
for { for {
rpc := new(RPC) rpc := new(RPC)
err := r.ReadMsg(&rpc.RPC) err := r.ReadMsg(&rpc.RPC)
...@@ -96,7 +97,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan ...@@ -96,7 +97,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
} }
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
r := ggio.NewDelimitedReader(s, p.maxMessageSize) r := protoio.NewDelimitedReader(s, p.maxMessageSize)
rpc := new(RPC) rpc := new(RPC)
for { for {
err := r.ReadMsg(&rpc.RPC) err := r.ReadMsg(&rpc.RPC)
...@@ -113,7 +114,7 @@ func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { ...@@ -113,7 +114,7 @@ func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) { func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
bufw := bufio.NewWriter(s) bufw := bufio.NewWriter(s)
wc := ggio.NewDelimitedWriter(bufw) wc := protoio.NewDelimitedWriter(bufw)
writeMsg := func(msg proto.Message) error { writeMsg := func(msg proto.Message) error {
err := wc.WriteMsg(msg) err := wc.WriteMsg(msg)
......
...@@ -23,7 +23,7 @@ import ( ...@@ -23,7 +23,7 @@ import (
bhost "github.com/libp2p/go-libp2p-blankhost" bhost "github.com/libp2p/go-libp2p-blankhost"
swarmt "github.com/libp2p/go-libp2p-swarm/testing" swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ggio "github.com/gogo/protobuf/io" "github.com/libp2p/go-msgio/protoio"
) )
func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Subscription) { func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Subscription) {
...@@ -1057,7 +1057,7 @@ type announceWatcher struct { ...@@ -1057,7 +1057,7 @@ type announceWatcher struct {
func (aw *announceWatcher) handleStream(s network.Stream) { func (aw *announceWatcher) handleStream(s network.Stream) {
defer s.Close() defer s.Close()
r := ggio.NewDelimitedReader(s, 1<<20) r := protoio.NewDelimitedReader(s, 1<<20)
var rpc pb.RPC var rpc pb.RPC
for { for {
......
This diff is collapsed.
...@@ -8,13 +8,15 @@ import ( ...@@ -8,13 +8,15 @@ import (
"testing" "testing"
"time" "time"
ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
pb "github.com/libp2p/go-libp2p-pubsub/pb" pb "github.com/libp2p/go-libp2p-pubsub/pb"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-msgio/protoio"
) )
// Test that when Gossipsub receives too many IWANT messages from a peer // Test that when Gossipsub receives too many IWANT messages from a peer
...@@ -718,8 +720,8 @@ func newMockGS(ctx context.Context, t *testing.T, attacker host.Host, onReadMsg ...@@ -718,8 +720,8 @@ func newMockGS(ctx context.Context, t *testing.T, attacker host.Host, onReadMsg
t.Fatal(err) t.Fatal(err)
} }
r := ggio.NewDelimitedReader(stream, maxMessageSize) r := protoio.NewDelimitedReader(stream, maxMessageSize)
w := ggio.NewDelimitedWriter(ostream) w := protoio.NewDelimitedWriter(ostream)
var irpc pb.RPC var irpc pb.RPC
......
...@@ -21,7 +21,7 @@ import ( ...@@ -21,7 +21,7 @@ import (
bhost "github.com/libp2p/go-libp2p-blankhost" bhost "github.com/libp2p/go-libp2p-blankhost"
swarmt "github.com/libp2p/go-libp2p-swarm/testing" swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ggio "github.com/gogo/protobuf/io" "github.com/libp2p/go-msgio/protoio"
) )
func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub { func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub {
...@@ -1611,8 +1611,8 @@ func (sq *sybilSquatter) handleStream(s network.Stream) { ...@@ -1611,8 +1611,8 @@ func (sq *sybilSquatter) handleStream(s network.Stream) {
// send a subscription for test in the output stream to become candidate for GRAFT // send a subscription for test in the output stream to become candidate for GRAFT
// and then just read and ignore the incoming RPCs // and then just read and ignore the incoming RPCs
r := ggio.NewDelimitedReader(s, 1<<20) r := protoio.NewDelimitedReader(s, 1<<20)
w := ggio.NewDelimitedWriter(os) w := protoio.NewDelimitedWriter(os)
truth := true truth := true
topic := "test" topic := "test"
err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}}) err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}})
...@@ -1792,8 +1792,8 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { ...@@ -1792,8 +1792,8 @@ func (iwe *iwantEverything) handleStream(s network.Stream) {
gossipMsgIdsReceived := make(map[string]struct{}) gossipMsgIdsReceived := make(map[string]struct{})
// send a subscription for test in the output stream to become candidate for gossip // send a subscription for test in the output stream to become candidate for gossip
r := ggio.NewDelimitedReader(s, 1<<20) r := protoio.NewDelimitedReader(s, 1<<20)
w := ggio.NewDelimitedWriter(os) w := protoio.NewDelimitedWriter(os)
truth := true truth := true
topic := "test" topic := "test"
err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}}) err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}})
......
...@@ -20,7 +20,7 @@ import ( ...@@ -20,7 +20,7 @@ import (
bhost "github.com/libp2p/go-libp2p-blankhost" bhost "github.com/libp2p/go-libp2p-blankhost"
swarmt "github.com/libp2p/go-libp2p-swarm/testing" swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ggio "github.com/gogo/protobuf/io" "github.com/libp2p/go-msgio/protoio"
) )
func testWithTracer(t *testing.T, tracer EventTracer) { func testWithTracer(t *testing.T, tracer EventTracer) {
...@@ -244,7 +244,7 @@ func TestPBTracer(t *testing.T) { ...@@ -244,7 +244,7 @@ func TestPBTracer(t *testing.T) {
} }
defer f.Close() defer f.Close()
r := ggio.NewDelimitedReader(f, 1<<20) r := protoio.NewDelimitedReader(f, 1<<20)
for { for {
evt.Reset() evt.Reset()
err := r.ReadMsg(&evt) err := r.ReadMsg(&evt)
...@@ -271,7 +271,7 @@ func (mrt *mockRemoteTracer) handleStream(s network.Stream) { ...@@ -271,7 +271,7 @@ func (mrt *mockRemoteTracer) handleStream(s network.Stream) {
panic(err) panic(err)
} }
r := ggio.NewDelimitedReader(gzr, 1<<24) r := protoio.NewDelimitedReader(gzr, 1<<24)
var batch pb.TraceEventBatch var batch pb.TraceEventBatch
for { for {
......
...@@ -18,7 +18,7 @@ import ( ...@@ -18,7 +18,7 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
ggio "github.com/gogo/protobuf/io" "github.com/libp2p/go-msgio/protoio"
) )
var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words. var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.
...@@ -154,7 +154,7 @@ func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) { ...@@ -154,7 +154,7 @@ func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) {
func (t *PBTracer) doWrite() { func (t *PBTracer) doWrite() {
var buf []*pb.TraceEvent var buf []*pb.TraceEvent
w := ggio.NewDelimitedWriter(t.w) w := protoio.NewDelimitedWriter(t.w)
for { for {
_, ok := <-t.ch _, ok := <-t.ch
...@@ -211,7 +211,7 @@ func (t *RemoteTracer) doWrite() { ...@@ -211,7 +211,7 @@ func (t *RemoteTracer) doWrite() {
var batch pb.TraceEventBatch var batch pb.TraceEventBatch
gzipW := gzip.NewWriter(s) gzipW := gzip.NewWriter(s)
w := ggio.NewDelimitedWriter(gzipW) w := protoio.NewDelimitedWriter(gzipW)
for { for {
_, ok := <-t.ch _, ok := <-t.ch
......
...@@ -15,7 +15,7 @@ import ( ...@@ -15,7 +15,7 @@ import (
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
ggio "github.com/gogo/protobuf/io" "github.com/libp2p/go-msgio/protoio"
) )
func TestRegisterUnregisterValidator(t *testing.T) { func TestRegisterUnregisterValidator(t *testing.T) {
...@@ -731,7 +731,7 @@ func (p *multiTopicPublisher) handleStream(s network.Stream) { ...@@ -731,7 +731,7 @@ func (p *multiTopicPublisher) handleStream(s network.Stream) {
p.out = append(p.out, os) p.out = append(p.out, os)
p.mx.Unlock() p.mx.Unlock()
r := ggio.NewDelimitedReader(s, 1<<20) r := protoio.NewDelimitedReader(s, 1<<20)
var rpc pb.RPC var rpc pb.RPC
for { for {
rpc.Reset() rpc.Reset()
...@@ -761,7 +761,7 @@ func (p *multiTopicPublisher) publish(msg string, topics ...string) { ...@@ -761,7 +761,7 @@ func (p *multiTopicPublisher) publish(msg string, topics ...string) {
p.mx.Lock() p.mx.Lock()
defer p.mx.Unlock() defer p.mx.Unlock()
for _, os := range p.out { for _, os := range p.out {
w := ggio.NewDelimitedWriter(os) w := protoio.NewDelimitedWriter(os)
err := w.WriteMsg(rpc) err := w.WriteMsg(rpc)
if err != nil { if err != nil {
panic(err) panic(err)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment