Commit b56769ff authored by tavit ohanian's avatar tavit ohanian

Merge branch 'port-2021-06-25'

parents 6be97953 8e7c8521
Pipeline #427 passed with stages
in 11 seconds
stages:
- build
- test
variables:
BUILD_DIR: "/tmp/$CI_CONCURRENT_PROJECT_ID"
before_script:
- mkdir -p $BUILD_DIR/src
- cd $BUILD_DIR/src
- if [ -d $CI_PROJECT_DIR ]
- then
- echo "soft link $CI_PROJECT_DIR exists"
- else
- echo "creating soft link $CI_PROJECT_DIR"
- ln -s $CI_PROJECT_DIR
- fi
- cd $CI_PROJECT_DIR
build:
stage: build
tags:
- testing
script:
- echo $CI_JOB_STAGE
- go build
test:
stage: test
tags:
- testing
script:
- echo $CI_JOB_STAGE
- go test -cover
coverage: '/coverage: \d+.\d+% of statements/'
Godeps/*
\ No newline at end of file
os:
- linux
language: go
go:
- 1.13.x
env:
global:
- GOTFLAGS="-race -cpu=5"
matrix:
- BUILD_DEPTYPE=gomod
# disable travis install
install:
- true
script:
- bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh)
cache:
directories:
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
notifications:
email: false
The MIT License (MIT)
Copyright (c) 2014 Juan Batiz-Benet
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# go-msgio
# go-msgio - Message IO
dms3 p2p go-msgio
\ No newline at end of file
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](https://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![codecov](https://codecov.io/gh/libp2p/go-libp2p-netutil/branch/master/graph/badge.svg)](https://codecov.io/gh/libp2p/go-msgio)
[![Travis CI](https://travis-ci.org/libp2p/go-libp2p-netutil.svg?branch=master)](https://travis-ci.org/libp2p/go-msgio)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
This is a simple package that helps read and write length-delimited slices. It's helpful for building wire protocols.
## Usage
### Reading
```go
import "github.com/libp2p/go-msgio"
rdr := ... // some reader from a wire
mrdr := msgio.NewReader(rdr)
for {
msg, err := mrdr.ReadMsg()
if err != nil {
return err
}
doSomething(msg)
}
```
### Writing
```go
import "github.com/libp2p/go-msgio"
wtr := genReader()
mwtr := msgio.NewWriter(wtr)
for {
msg := genMessage()
err := mwtr.WriteMsg(msg)
if err != nil {
return err
}
}
```
### Duplex
```go
import "github.com/libp2p/go-msgio"
rw := genReadWriter()
mrw := msgio.NewReadWriter(rw)
for {
msg, err := mrdr.ReadMsg()
if err != nil {
return err
}
// echo it back :)
err = mwtr.WriteMsg(msg)
if err != nil {
return err
}
}
```
### Channels
```go
import "github.com/libp2p/go-msgio"
rw := genReadWriter()
rch := msgio.NewReadChannel(rw)
wch := msgio.NewWriteChannel(rw)
for {
msg, err := <-rch
if err != nil {
return err
}
// echo it back :)
wch<- rw
}
```
---
The last gx published version of this module was: 0.0.6: QmcxL9MDzSU5Mj1GcWZD8CXkAFuJXjdbjotZ93o371bKSf
package msgio
import (
"io"
pool "gitlab.dms3.io/p2p/go-buffer-pool"
)
// Chan is a msgio duplex channel. It is used to have a channel interface
// around a msgio.Reader or Writer.
type Chan struct {
MsgChan chan []byte
ErrChan chan error
CloseChan chan bool
}
// NewChan constructs a Chan with a given buffer size.
func NewChan(chanSize int) *Chan {
return &Chan{
MsgChan: make(chan []byte, chanSize),
ErrChan: make(chan error, 1),
CloseChan: make(chan bool, 2),
}
}
// ReadFrom wraps the given io.Reader with a msgio.Reader, reads all
// messages, ands sends them down the channel.
func (s *Chan) ReadFrom(r io.Reader) {
s.readFrom(NewReader(r))
}
// ReadFromWithPool wraps the given io.Reader with a msgio.Reader, reads all
// messages, ands sends them down the channel. Uses given BufferPool.
func (s *Chan) ReadFromWithPool(r io.Reader, p *pool.BufferPool) {
s.readFrom(NewReaderWithPool(r, p))
}
// ReadFrom wraps the given io.Reader with a msgio.Reader, reads all
// messages, ands sends them down the channel.
func (s *Chan) readFrom(mr Reader) {
Loop:
for {
buf, err := mr.ReadMsg()
if err != nil {
if err == io.EOF {
break Loop // done
}
// unexpected error. tell the client.
s.ErrChan <- err
break Loop
}
select {
case <-s.CloseChan:
break Loop // told we're done
case s.MsgChan <- buf:
// ok seems fine. send it away
}
}
close(s.MsgChan)
// signal we're done
s.CloseChan <- true
}
// WriteTo wraps the given io.Writer with a msgio.Writer, listens on the
// channel and writes all messages to the writer.
func (s *Chan) WriteTo(w io.Writer) {
// new buffer per message
// if bottleneck, cycle around a set of buffers
mw := NewWriter(w)
Loop:
for {
select {
case <-s.CloseChan:
break Loop // told we're done
case msg, ok := <-s.MsgChan:
if !ok { // chan closed
break Loop
}
if err := mw.WriteMsg(msg); err != nil {
if err != io.EOF {
// unexpected error. tell the client.
s.ErrChan <- err
}
break Loop
}
}
}
// signal we're done
s.CloseChan <- true
}
// Close the Chan
func (s *Chan) Close() {
s.CloseChan <- true
}
// nullLocker conforms to the sync.Locker interface but does nothing.
type nullLocker struct{}
func (l *nullLocker) Lock() {}
func (l *nullLocker) Unlock() {}
package msgio
import (
"bytes"
"io"
"math/rand"
"testing"
"time"
)
func randBuf(r *rand.Rand, size int) []byte {
buf := make([]byte, size)
_, _ = r.Read(buf)
return buf
}
func TestReadChan(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
rchan := NewChan(10)
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = randBuf(r, r.Intn(1000))
err := writer.WriteMsg(msgs[i])
if err != nil {
t.Fatal(err)
}
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
go rchan.ReadFrom(buf)
defer rchan.Close()
Loop:
for i := 0; ; i++ {
select {
case err := <-rchan.ErrChan:
if err != nil {
t.Fatal("unexpected error", err)
}
case msg2, ok := <-rchan.MsgChan:
if !ok {
if i < len(msg2) {
t.Error("failed to read all messages", len(msgs), i)
}
break Loop
}
msg1 := msgs[i]
if !bytes.Equal(msg1, msg2) {
t.Fatal("message retrieved not equal\n", msg1, "\n\n", msg2)
}
}
}
}
func TestWriteChan(t *testing.T) {
buf := bytes.NewBuffer(nil)
reader := NewReader(buf)
wchan := NewChan(10)
msgs := [1000][]byte{}
go wchan.WriteTo(buf)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = randBuf(r, r.Intn(1000))
select {
case err := <-wchan.ErrChan:
if err != nil {
t.Fatal("unexpected error", err)
}
case wchan.MsgChan <- msgs[i]:
}
}
// tell chan we're done.
close(wchan.MsgChan)
// wait for writing to end
<-wchan.CloseChan
defer wchan.Close()
for i := 0; ; i++ {
msg2, err := reader.ReadMsg()
if err != nil {
if err == io.EOF {
if i < len(msg2) {
t.Error("failed to read all messages", len(msgs), i)
}
break
}
t.Error("unexpected error", err)
}
msg1 := msgs[i]
if !bytes.Equal(msg1, msg2) {
t.Fatal("message retrieved not equal\n", msg1, "\n\n", msg2)
}
}
if err := reader.Close(); err != nil {
t.Error(err)
}
}
coverage:
range: "50...100"
comment: off
// +build gofuzz
package msgio
import "bytes"
// get the go-fuzz tools and build a fuzzer
// $ go get -u github.com/dvyukov/go-fuzz/...
// $ go-fuzz-build gitlab.dms3.io/p2p/go-msgio
// put a corpus of random (even better if actual, structured) data in a corpus directry
// $ go-fuzz -bin ./msgio-fuzz -corpus corpus -workdir=wdir -timeout=15
func Fuzz(data []byte) int {
rc := NewReader(bytes.NewReader(data))
// rc := NewVarintReader(bytes.NewReader(data))
if _, err := rc.ReadMsg(); err != nil {
return 0
}
return 1
}
package msgio
import (
"strings"
"testing"
)
func TestReader_CrashOne(t *testing.T) {
rc := NewReader(strings.NewReader("\x83000"))
_, err := rc.ReadMsg()
if err != ErrMsgTooLarge {
t.Error("should get ErrMsgTooLarge")
t.Log(err)
}
}
func TestVarintReader_CrashOne(t *testing.T) {
rc := NewVarintReader(strings.NewReader("\x9a\xf1\xed\x9a0"))
_, err := rc.ReadMsg()
if err != ErrMsgTooLarge {
t.Error("should get ErrMsgTooLarge")
t.Log(err)
}
}
package msgio
import (
"bytes"
"io"
"sync"
)
// LimitedReader wraps an io.Reader with a msgio framed reader. The LimitedReader
// will return a reader which will io.EOF when the msg length is done.
func LimitedReader(r io.Reader) (io.Reader, error) {
l, err := ReadLen(r, nil)
return io.LimitReader(r, int64(l)), err
}
// LimitedWriter wraps an io.Writer with a msgio framed writer. It is the inverse
// of LimitedReader: it will buffer all writes until "Flush" is called. When Flush
// is called, it will write the size of the buffer first, flush the buffer, reset
// the buffer, and begin accept more incoming writes.
func NewLimitedWriter(w io.Writer) *LimitedWriter {
return &LimitedWriter{W: w}
}
type LimitedWriter struct {
W io.Writer
B bytes.Buffer
M sync.Mutex
}
func (w *LimitedWriter) Write(buf []byte) (n int, err error) {
w.M.Lock()
n, err = w.B.Write(buf)
w.M.Unlock()
return n, err
}
func (w *LimitedWriter) Flush() error {
w.M.Lock()
defer w.M.Unlock()
if err := WriteLen(w.W, w.B.Len()); err != nil {
return err
}
_, err := w.B.WriteTo(w.W)
return err
}
package msgio
import (
"bytes"
"testing"
)
func TestLimitReader(t *testing.T) {
buf := bytes.NewBuffer(nil)
reader, _ := LimitedReader(buf) // limit is set to 0
n, err := reader.Read([]byte{})
if n != 0 || err.Error() != "EOF" {
t.Fatal("Expected not to read anything")
}
}
func TestLimitWriter(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewLimitedWriter(buf)
n, err := writer.Write([]byte{1, 2, 3})
if n != 3 || err != nil {
t.Fatal("Expected to write 3 bytes with no errors")
}
err = writer.Flush()
}
package msgio
import (
"errors"
"io"
"sync"
pool "gitlab.dms3.io/p2p/go-buffer-pool"
)
// ErrMsgTooLarge is returned when the message length is exessive
var ErrMsgTooLarge = errors.New("message too large")
const (
lengthSize = 4
defaultMaxSize = 8 * 1024 * 1024 // 8mb
)
// Writer is the msgio Writer interface. It writes len-framed messages.
type Writer interface {
// Write writes passed in buffer as a single message.
Write([]byte) (int, error)
// WriteMsg writes the msg in the passed in buffer.
WriteMsg([]byte) error
}
// WriteCloser is a Writer + Closer interface. Like in `golang/pkg/io`
type WriteCloser interface {
Writer
io.Closer
}
// Reader is the msgio Reader interface. It reads len-framed messages.
type Reader interface {
// Read reads the next message from the Reader.
// The client must pass a buffer large enough, or io.ErrShortBuffer will be
// returned.
Read([]byte) (int, error)
// ReadMsg reads the next message from the Reader.
// Uses a pool.BufferPool internally to reuse buffers. User may call
// ReleaseMsg(msg) to signal a buffer can be reused.
ReadMsg() ([]byte, error)
// ReleaseMsg signals a buffer can be reused.
ReleaseMsg([]byte)
// NextMsgLen returns the length of the next (peeked) message. Does
// not destroy the message or have other adverse effects
NextMsgLen() (int, error)
}
// ReadCloser combines a Reader and Closer.
type ReadCloser interface {
Reader
io.Closer
}
// ReadWriter combines a Reader and Writer.
type ReadWriter interface {
Reader
Writer
}
// ReadWriteCloser combines a Reader, a Writer, and Closer.
type ReadWriteCloser interface {
Reader
Writer
io.Closer
}
// writer is the underlying type that implements the Writer interface.
type writer struct {
W io.Writer
pool *pool.BufferPool
lock sync.Mutex
}
// NewWriter wraps an io.Writer with a msgio framed writer. The msgio.Writer
// will write the length prefix of every message written.
func NewWriter(w io.Writer) WriteCloser {
return NewWriterWithPool(w, pool.GlobalPool)
}
// NewWriterWithPool is identical to NewWriter but allows the user to pass a
// custom buffer pool.
func NewWriterWithPool(w io.Writer, p *pool.BufferPool) WriteCloser {
return &writer{W: w, pool: p}
}
func (s *writer) Write(msg []byte) (int, error) {
err := s.WriteMsg(msg)
if err != nil {
return 0, err
}
return len(msg), nil
}
func (s *writer) WriteMsg(msg []byte) (err error) {
s.lock.Lock()
defer s.lock.Unlock()
buf := s.pool.Get(len(msg) + lengthSize)
NBO.PutUint32(buf, uint32(len(msg)))
copy(buf[lengthSize:], msg)
_, err = s.W.Write(buf)
s.pool.Put(buf)
return err
}
func (s *writer) Close() error {
if c, ok := s.W.(io.Closer); ok {
return c.Close()
}
return nil
}
// reader is the underlying type that implements the Reader interface.
type reader struct {
R io.Reader
lbuf [lengthSize]byte
next int
pool *pool.BufferPool
lock sync.Mutex
max int // the maximal message size (in bytes) this reader handles
}
// NewReader wraps an io.Reader with a msgio framed reader. The msgio.Reader
// will read whole messages at a time (using the length). Assumes an equivalent
// writer on the other side.
func NewReader(r io.Reader) ReadCloser {
return NewReaderWithPool(r, pool.GlobalPool)
}
// NewReaderSize is equivalent to NewReader but allows one to
// specify a max message size.
func NewReaderSize(r io.Reader, maxMessageSize int) ReadCloser {
return NewReaderSizeWithPool(r, maxMessageSize, pool.GlobalPool)
}
// NewReaderWithPool is the same as NewReader but allows one to specify a buffer
// pool.
func NewReaderWithPool(r io.Reader, p *pool.BufferPool) ReadCloser {
return NewReaderSizeWithPool(r, defaultMaxSize, p)
}
// NewReaderWithPool is the same as NewReader but allows one to specify a buffer
// pool and a max message size.
func NewReaderSizeWithPool(r io.Reader, maxMessageSize int, p *pool.BufferPool) ReadCloser {
if p == nil {
panic("nil pool")
}
return &reader{
R: r,
next: -1,
pool: p,
max: maxMessageSize,
}
}
// NextMsgLen reads the length of the next msg into s.lbuf, and returns it.
// WARNING: like Read, NextMsgLen is destructive. It reads from the internal
// reader.
func (s *reader) NextMsgLen() (int, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.nextMsgLen()
}
func (s *reader) nextMsgLen() (int, error) {
if s.next == -1 {
n, err := ReadLen(s.R, s.lbuf[:])
if err != nil {
return 0, err
}
s.next = n
}
return s.next, nil
}
func (s *reader) Read(msg []byte) (int, error) {
s.lock.Lock()
defer s.lock.Unlock()
length, err := s.nextMsgLen()
if err != nil {
return 0, err
}
if length > len(msg) {
return 0, io.ErrShortBuffer
}
read, err := io.ReadFull(s.R, msg[:length])
if read < length {
s.next = length - read // we only partially consumed the message.
} else {
s.next = -1 // signal we've consumed this msg
}
return read, err
}
func (s *reader) ReadMsg() ([]byte, error) {
s.lock.Lock()
defer s.lock.Unlock()
length, err := s.nextMsgLen()
if err != nil {
return nil, err
}
if length == 0 {
s.next = -1
return nil, nil
}
if length > s.max || length < 0 {
return nil, ErrMsgTooLarge
}
msg := s.pool.Get(length)
read, err := io.ReadFull(s.R, msg)
if read < length {
s.next = length - read // we only partially consumed the message.
} else {
s.next = -1 // signal we've consumed this msg
}
return msg[:read], err
}
func (s *reader) ReleaseMsg(msg []byte) {
s.pool.Put(msg)
}
func (s *reader) Close() error {
if c, ok := s.R.(io.Closer); ok {
return c.Close()
}
return nil
}
// readWriter is the underlying type that implements a ReadWriter.
type readWriter struct {
Reader
Writer
}
// NewReadWriter wraps an io.ReadWriter with a msgio.ReadWriter. Writing
// and Reading will be appropriately framed.
func NewReadWriter(rw io.ReadWriter) ReadWriteCloser {
return &readWriter{
Reader: NewReader(rw),
Writer: NewWriter(rw),
}
}
// Combine wraps a pair of msgio.Writer and msgio.Reader with a msgio.ReadWriter.
func Combine(w Writer, r Reader) ReadWriteCloser {
return &readWriter{Reader: r, Writer: w}
}
func (rw *readWriter) Close() error {
var errs []error
if w, ok := rw.Writer.(WriteCloser); ok {
if err := w.Close(); err != nil {
errs = append(errs, err)
}
}
if r, ok := rw.Reader.(ReadCloser); ok {
if err := r.Close(); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return multiErr(errs)
}
return nil
}
// multiErr is a util to return multiple errors
type multiErr []error
func (m multiErr) Error() string {
if len(m) == 0 {
return "no errors"
}
s := "Multiple errors: "
for i, e := range m {
if i != 0 {
s += ", "
}
s += e.Error()
}
return s
}
# msgio headers tool
Conveniently output msgio headers.
## Install
```
go get github.com/libp2p/go-msgio/msgio
```
## Usage
```
> msgio -h
msgio - tool to wrap messages with msgio header
Usage
msgio header 1020 >header
cat file | msgio wrap >wrapped
Commands
header <size> output a msgio header of given size
wrap wrap incoming stream with msgio
```
package main
import (
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
msgio "gitlab.dms3.io/p2p/go-msgio"
)
var Args ArgType
type ArgType struct {
Command string
Args []string
}
func (a *ArgType) Arg(i int) string {
n := i + 1
if len(a.Args) < n {
die(fmt.Sprintf("expected %d argument(s)", n))
}
return a.Args[i]
}
var usageStr = `
msgio - tool to wrap messages with msgio header
Usage
msgio header 1020 >header
cat file | msgio wrap >wrapped
Commands
header <size> output a msgio header of given size
wrap wrap incoming stream with msgio
`
func usage() {
fmt.Println(strings.TrimSpace(usageStr))
os.Exit(0)
}
func die(err string) {
fmt.Fprintf(os.Stderr, "error: %s\n", err)
os.Exit(-1)
}
func main() {
if err := run(); err != nil {
die(err.Error())
}
}
func argParse() {
flag.Usage = usage
flag.Parse()
args := flag.Args()
if l := len(args); l < 1 || l > 2 {
usage()
}
Args.Command = flag.Args()[0]
Args.Args = flag.Args()[1:]
}
func run() error {
argParse()
w := os.Stdout
r := os.Stdin
switch Args.Command {
case "header":
size, err := strconv.Atoi(Args.Arg(0))
if err != nil {
return err
}
return header(w, size)
case "wrap":
return wrap(w, r)
default:
usage()
return nil
}
}
func header(w io.Writer, size int) error {
return msgio.WriteLen(w, size)
}
func wrap(w io.Writer, r io.Reader) error {
buf, err := ioutil.ReadAll(r)
if err != nil {
return err
}
if err := msgio.WriteLen(w, len(buf)); err != nil {
return err
}
_, err = w.Write(buf)
return err
}
package msgio
import (
"bytes"
"errors"
"fmt"
"io"
"math/rand"
str "strings"
"sync"
"testing"
"time"
)
func TestReadWrite(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
SubtestReadWrite(t, writer, reader)
}
func TestReadWriteMsg(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
SubtestReadWriteMsg(t, writer, reader)
}
func TestReadWriteMsgSync(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
SubtestReadWriteMsgSync(t, writer, reader)
}
func TestReadClose(t *testing.T) {
r, w := io.Pipe()
writer := NewWriter(w)
reader := NewReader(r)
SubtestReadClose(t, writer, reader)
}
func TestWriteClose(t *testing.T) {
r, w := io.Pipe()
writer := NewWriter(w)
reader := NewReader(r)
SubtestWriteClose(t, writer, reader)
}
type testIoReadWriter struct {
io.Reader
io.Writer
}
func TestReadWriterClose(t *testing.T) {
r, w := io.Pipe()
var rw ReadWriteCloser
rw = NewReadWriter(testIoReadWriter{r, w})
SubtestReaderWriterClose(t, rw)
}
func TestReadWriterCombine(t *testing.T) {
r, w := io.Pipe()
writer := NewWriter(w)
reader := NewReader(r)
rw := Combine(writer, reader)
rw.Close()
}
func TestMultiError(t *testing.T) {
emptyError := multiErr([]error{})
if emptyError.Error() != "no errors" {
t.Fatal("Expected no errors")
}
twoErrors := multiErr([]error{errors.New("one"), errors.New("two")})
if eStr := twoErrors.Error(); !str.Contains(eStr, "one") && !str.Contains(eStr, "two") {
t.Fatal("Expected error messages not included")
}
}
func TestShortBufferError(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
SubtestReadShortBuffer(t, writer, reader)
}
func SubtestReadWrite(t *testing.T, writer WriteCloser, reader ReadCloser) {
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = randBuf(r, r.Intn(1000))
n, err := writer.Write(msgs[i])
if err != nil {
t.Fatal(err)
}
if n != len(msgs[i]) {
t.Fatal("wrong length:", n, len(msgs[i]))
}
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
for i := 0; ; i++ {
msg2 := make([]byte, 1000)
n, err := reader.Read(msg2)
if err != nil {
if err == io.EOF {
if i < len(msg2) {
t.Error("failed to read all messages", len(msgs), i)
}
break
}
t.Error("unexpected error", err)
}
msg1 := msgs[i]
msg2 = msg2[:n]
if !bytes.Equal(msg1, msg2) {
t.Fatal("message retrieved not equal\n", msg1, "\n\n", msg2)
}
}
if err := reader.Close(); err != nil {
t.Error(err)
}
}
func SubtestReadWriteMsg(t *testing.T, writer WriteCloser, reader ReadCloser) {
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = randBuf(r, r.Intn(1000))
err := writer.WriteMsg(msgs[i])
if err != nil {
t.Fatal(err)
}
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
for i := 0; ; i++ {
msg2, err := reader.ReadMsg()
if err != nil {
if err == io.EOF {
if i < len(msg2) {
t.Error("failed to read all messages", len(msgs), i)
}
break
}
t.Error("unexpected error", err)
}
msg1 := msgs[i]
if !bytes.Equal(msg1, msg2) {
t.Fatal("message retrieved not equal\n", msg1, "\n\n", msg2)
}
}
if err := reader.Close(); err != nil {
t.Error(err)
}
}
func SubtestReadWriteMsgSync(t *testing.T, writer WriteCloser, reader ReadCloser) {
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = randBuf(r, r.Intn(1000)+4)
NBO.PutUint32(msgs[i][:4], uint32(i))
}
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
errs := make(chan error, 10000)
for i := range msgs {
wg1.Add(1)
go func(i int) {
defer wg1.Done()
err := writer.WriteMsg(msgs[i])
if err != nil {
errs <- err
}
}(i)
}
wg1.Wait()
if err := writer.Close(); err != nil {
t.Fatal(err)
}
for i := 0; i < len(msgs)+1; i++ {
wg2.Add(1)
go func(i int) {
defer wg2.Done()
msg2, err := reader.ReadMsg()
if err != nil {
if err == io.EOF {
if i < len(msg2) {
errs <- fmt.Errorf("failed to read all messages %d %d", len(msgs), i)
}
return
}
errs <- fmt.Errorf("unexpected error: %s", err)
}
mi := NBO.Uint32(msg2[:4])
msg1 := msgs[mi]
if !bytes.Equal(msg1, msg2) {
errs <- fmt.Errorf("message retrieved not equal\n%s\n\n%s", msg1, msg2)
}
}(i)
}
wg2.Wait()
close(errs)
if err := reader.Close(); err != nil {
t.Error(err)
}
for e := range errs {
t.Error(e)
}
}
func TestBadSizes(t *testing.T) {
data := make([]byte, 4)
// on a 64 bit system, this will fail because its too large
// on a 32 bit system, this will fail because its too small
NBO.PutUint32(data, 4000000000)
buf := bytes.NewReader(data)
read := NewReader(buf)
msg, err := read.ReadMsg()
if err == nil {
t.Fatal(err)
}
_ = msg
}
func SubtestReadClose(t *testing.T, writer WriteCloser, reader ReadCloser) {
defer writer.Close()
buf := [10]byte{}
done := make(chan struct{})
go func() {
defer close(done)
time.Sleep(10 * time.Millisecond)
reader.Close()
}()
n, err := reader.Read(buf[:])
if n != 0 || err == nil {
t.Error("expected to read nothing")
}
<-done
}
func SubtestWriteClose(t *testing.T, writer WriteCloser, reader ReadCloser) {
defer reader.Close()
buf := [10]byte{}
done := make(chan struct{})
go func() {
defer close(done)
time.Sleep(10 * time.Millisecond)
writer.Close()
}()
n, err := writer.Write(buf[:])
if n != 0 || err == nil {
t.Error("expected to write nothing")
}
<-done
}
func SubtestReaderWriterClose(t *testing.T, rw ReadWriteCloser) {
buf := [10]byte{}
done := make(chan struct{})
go func() {
defer close(done)
time.Sleep(10 * time.Millisecond)
buf := [10]byte{}
rw.Read(buf[:])
rw.Close()
}()
n, err := rw.Write(buf[:])
if n != 10 || err != nil {
t.Error("Expected to write 10 bytes")
}
<-done
}
func SubtestReadShortBuffer(t *testing.T, writer WriteCloser, reader ReadCloser) {
defer reader.Close()
shortReadBuf := [1]byte{}
done := make(chan struct{})
go func() {
defer writer.Close()
defer close(done)
time.Sleep(10 * time.Millisecond)
largeWriteBuf := [10]byte{}
writer.Write(largeWriteBuf[:])
}()
<-done
n, _ := reader.NextMsgLen()
if n != 10 {
t.Fatal("Expected next message to have length of 10")
}
_, err := reader.Read(shortReadBuf[:])
if err != io.ErrShortBuffer {
t.Fatal("Expected short buffer error")
}
}
package msgio
import (
"encoding/binary"
"io"
)
// NBO is NetworkByteOrder
var NBO = binary.BigEndian
// WriteLen writes a length to the given writer.
func WriteLen(w io.Writer, l int) error {
ul := uint32(l)
return binary.Write(w, NBO, &ul)
}
// ReadLen reads a length from the given reader.
// if buf is non-nil, it reuses the buffer. Ex:
// l, err := ReadLen(r, nil)
// _, err := ReadLen(r, buf)
func ReadLen(r io.Reader, buf []byte) (int, error) {
if len(buf) < 4 {
buf = make([]byte, 4)
}
buf = buf[:4]
if _, err := io.ReadFull(r, buf); err != nil {
return 0, err
}
n := int(NBO.Uint32(buf))
return n, nil
}
//
// Adapted from gogo/protobuf to use multiformats/go-varint for
// efficient, interoperable length-prefixing.
//
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
package protoio
import (
"io"
"github.com/gogo/protobuf/proto"
)
type Writer interface {
WriteMsg(proto.Message) error
}
type WriteCloser interface {
Writer
io.Closer
}
type Reader interface {
ReadMsg(msg proto.Message) error
}
type ReadCloser interface {
Reader
io.Closer
}
func getSize(v interface{}) (int, bool) {
if sz, ok := v.(interface {
Size() (n int)
}); ok {
return sz.Size(), true
} else if sz, ok := v.(interface {
ProtoSize() (n int)
}); ok {
return sz.ProtoSize(), true
} else {
return 0, false
}
}
//
// Adapted from gogo/protobuf to use multiformats/go-varint for
// efficient, interoperable length-prefixing.
//
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
package protoio
import (
"bufio"
"io"
"github.com/gogo/protobuf/proto"
"github.com/multiformats/go-varint"
)
type uvarintReader struct {
r *bufio.Reader
buf []byte
maxSize int
closer io.Closer
}
func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser {
var closer io.Closer
if c, ok := r.(io.Closer); ok {
closer = c
}
return &uvarintReader{bufio.NewReader(r), nil, maxSize, closer}
}
func (ur *uvarintReader) ReadMsg(msg proto.Message) error {
length64, err := varint.ReadUvarint(ur.r)
if err != nil {
return err
}
length := int(length64)
if length < 0 || length > ur.maxSize {
return io.ErrShortBuffer
}
if len(ur.buf) < length {
ur.buf = make([]byte, length)
}
buf := ur.buf[:length]
if _, err := io.ReadFull(ur.r, buf); err != nil {
return err
}
return proto.Unmarshal(buf, msg)
}
func (ur *uvarintReader) Close() error {
if ur.closer != nil {
return ur.closer.Close()
}
return nil
}
//
// Adapted from gogo/protobuf to use multiformats/go-varint for
// efficient, interoperable length-prefixing.
//
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
package protoio_test
import (
"bytes"
"io"
"math/rand"
"testing"
"time"
"github.com/gogo/protobuf/test"
"github.com/multiformats/go-varint"
"gitlab.dms3.io/p2p/go-msgio/protoio"
)
func TestVarintNormal(t *testing.T) {
buf := newBuffer()
writer := protoio.NewDelimitedWriter(buf)
reader := protoio.NewDelimitedReader(buf, 1024*1024)
if err := iotest(writer, reader); err != nil {
t.Error(err)
}
if !buf.closed {
t.Fatalf("did not close buffer")
}
}
func TestVarintNoClose(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := protoio.NewDelimitedWriter(buf)
reader := protoio.NewDelimitedReader(buf, 1024*1024)
if err := iotest(writer, reader); err != nil {
t.Error(err)
}
}
// https://github.com/gogo/protobuf/issues/32
func TestVarintMaxSize(t *testing.T) {
buf := newBuffer()
writer := protoio.NewDelimitedWriter(buf)
reader := protoio.NewDelimitedReader(buf, 20)
if err := iotest(writer, reader); err != io.ErrShortBuffer {
t.Error(err)
} else {
t.Logf("%s", err)
}
}
func TestVarintError(t *testing.T) {
buf := newBuffer()
// beyond uvarint63 capacity.
buf.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff})
reader := protoio.NewDelimitedReader(buf, 1024*1024)
msg := &test.NinOptNative{}
err := reader.ReadMsg(msg)
if err != varint.ErrOverflow {
t.Fatalf("expected varint.ErrOverflow error")
}
}
type buffer struct {
*bytes.Buffer
closed bool
}
func (b *buffer) Close() error {
b.closed = true
return nil
}
func newBuffer() *buffer {
return &buffer{bytes.NewBuffer(nil), false}
}
func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error {
size := 1000
msgs := make([]*test.NinOptNative, size)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = test.NewPopulatedNinOptNative(r, true)
// https://github.com/gogo/protobuf/issues/31
if i == 5 {
msgs[i] = &test.NinOptNative{}
}
// https://github.com/gogo/protobuf/issues/31
if i == 999 {
msgs[i] = &test.NinOptNative{}
}
err := writer.WriteMsg(msgs[i])
if err != nil {
return err
}
}
if err := writer.Close(); err != nil {
return err
}
i := 0
for {
msg := &test.NinOptNative{}
if err := reader.ReadMsg(msg); err != nil {
if err == io.EOF {
break
}
return err
}
if err := msg.VerboseEqual(msgs[i]); err != nil {
return err
}
i++
}
if i != size {
panic("not enough messages read")
}
if err := reader.Close(); err != nil {
return err
}
return nil
}
//
// Adapted from gogo/protobuf to use multiformats/go-varint for
// efficient, interoperable length-prefixing.
//
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
package protoio
import (
"io"
"github.com/gogo/protobuf/proto"
"github.com/multiformats/go-varint"
)
type uvarintWriter struct {
w io.Writer
lenBuf []byte
buffer []byte
}
func NewDelimitedWriter(w io.Writer) WriteCloser {
return &uvarintWriter{w, make([]byte, varint.MaxLenUvarint63), nil}
}
func (uw *uvarintWriter) WriteMsg(msg proto.Message) (err error) {
var data []byte
if m, ok := msg.(interface {
MarshalTo(data []byte) (n int, err error)
}); ok {
n, ok := getSize(m)
if ok {
if n+varint.MaxLenUvarint63 >= len(uw.buffer) {
uw.buffer = make([]byte, n+varint.MaxLenUvarint63)
}
lenOff := varint.PutUvarint(uw.buffer, uint64(n))
_, err = m.MarshalTo(uw.buffer[lenOff:])
if err != nil {
return err
}
_, err = uw.w.Write(uw.buffer[:lenOff+n])
return err
}
}
// fallback
data, err = proto.Marshal(msg)
if err != nil {
return err
}
length := uint64(len(data))
n := varint.PutUvarint(uw.lenBuf, length)
_, err = uw.w.Write(uw.lenBuf[:n])
if err != nil {
return err
}
_, err = uw.w.Write(data)
return err
}
func (uw *uvarintWriter) Close() error {
if closer, ok := uw.w.(io.Closer); ok {
return closer.Close()
}
return nil
}
package msgio
import (
"encoding/binary"
"io"
"sync"
"github.com/multiformats/go-varint"
pool "gitlab.dms3.io/p2p/go-buffer-pool"
)
// varintWriter is the underlying type that implements the Writer interface.
type varintWriter struct {
W io.Writer
pool *pool.BufferPool
lock sync.Mutex // for threadsafe writes
}
// NewVarintWriter wraps an io.Writer with a varint msgio framed writer.
// The msgio.Writer will write the length prefix of every message written
// as a varint, using https://golang.org/pkg/encoding/binary/#PutUvarint
func NewVarintWriter(w io.Writer) WriteCloser {
return NewVarintWriterWithPool(w, pool.GlobalPool)
}
func NewVarintWriterWithPool(w io.Writer, p *pool.BufferPool) WriteCloser {
return &varintWriter{
pool: p,
W: w,
}
}
func (s *varintWriter) Write(msg []byte) (int, error) {
err := s.WriteMsg(msg)
if err != nil {
return 0, err
}
return len(msg), nil
}
func (s *varintWriter) WriteMsg(msg []byte) error {
s.lock.Lock()
defer s.lock.Unlock()
buf := s.pool.Get(len(msg) + binary.MaxVarintLen64)
n := binary.PutUvarint(buf, uint64(len(msg)))
n += copy(buf[n:], msg)
_, err := s.W.Write(buf[:n])
s.pool.Put(buf)
return err
}
func (s *varintWriter) Close() error {
if c, ok := s.W.(io.Closer); ok {
return c.Close()
}
return nil
}
// varintReader is the underlying type that implements the Reader interface.
type varintReader struct {
R io.Reader
br io.ByteReader // for reading varints.
next int
pool *pool.BufferPool
lock sync.Mutex
max int // the maximal message size (in bytes) this reader handles
}
// NewVarintReader wraps an io.Reader with a varint msgio framed reader.
// The msgio.Reader will read whole messages at a time (using the length).
// Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint
// Assumes an equivalent writer on the other side.
func NewVarintReader(r io.Reader) ReadCloser {
return NewVarintReaderSize(r, defaultMaxSize)
}
// NewVarintReaderSize is equivalent to NewVarintReader but allows one to
// specify a max message size.
func NewVarintReaderSize(r io.Reader, maxMessageSize int) ReadCloser {
return NewVarintReaderSizeWithPool(r, maxMessageSize, pool.GlobalPool)
}
// NewVarintReaderWithPool is the same as NewVarintReader but allows one to
// specify a buffer pool.
func NewVarintReaderWithPool(r io.Reader, p *pool.BufferPool) ReadCloser {
return NewVarintReaderSizeWithPool(r, defaultMaxSize, p)
}
// NewVarintReaderWithPool is the same as NewVarintReader but allows one to
// specify a buffer pool and a max message size.
func NewVarintReaderSizeWithPool(r io.Reader, maxMessageSize int, p *pool.BufferPool) ReadCloser {
if p == nil {
panic("nil pool")
}
return &varintReader{
R: r,
br: &simpleByteReader{R: r},
next: -1,
pool: p,
max: maxMessageSize,
}
}
// NextMsgLen reads the length of the next msg into s.lbuf, and returns it.
// WARNING: like Read, NextMsgLen is destructive. It reads from the internal
// reader.
func (s *varintReader) NextMsgLen() (int, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.nextMsgLen()
}
func (s *varintReader) nextMsgLen() (int, error) {
if s.next == -1 {
length, err := varint.ReadUvarint(s.br)
if err != nil {
return 0, err
}
s.next = int(length)
}
return s.next, nil
}
func (s *varintReader) Read(msg []byte) (int, error) {
s.lock.Lock()
defer s.lock.Unlock()
length, err := s.nextMsgLen()
if err != nil {
return 0, err
}
if length > len(msg) {
return 0, io.ErrShortBuffer
}
_, err = io.ReadFull(s.R, msg[:length])
s.next = -1 // signal we've consumed this msg
return length, err
}
func (s *varintReader) ReadMsg() ([]byte, error) {
s.lock.Lock()
defer s.lock.Unlock()
length, err := s.nextMsgLen()
if err != nil {
return nil, err
}
if length == 0 {
s.next = -1
return nil, nil
}
if length > s.max {
return nil, ErrMsgTooLarge
}
msg := s.pool.Get(length)
_, err = io.ReadFull(s.R, msg)
s.next = -1 // signal we've consumed this msg
return msg, err
}
func (s *varintReader) ReleaseMsg(msg []byte) {
s.pool.Put(msg)
}
func (s *varintReader) Close() error {
if c, ok := s.R.(io.Closer); ok {
return c.Close()
}
return nil
}
type simpleByteReader struct {
R io.Reader
buf [1]byte
}
func (r *simpleByteReader) ReadByte() (c byte, err error) {
if _, err := io.ReadFull(r.R, r.buf[:]); err != nil {
return 0, err
}
return r.buf[0], nil
}
package msgio
import (
"bytes"
"encoding/binary"
"io"
"testing"
"github.com/multiformats/go-varint"
)
func TestVarintReadWrite(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewVarintWriter(buf)
reader := NewVarintReader(buf)
SubtestReadWrite(t, writer, reader)
}
func TestVarintReadWriteMsg(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewVarintWriter(buf)
reader := NewVarintReader(buf)
SubtestReadWriteMsg(t, writer, reader)
}
func TestVarintReadWriteMsgSync(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewVarintWriter(buf)
reader := NewVarintReader(buf)
SubtestReadWriteMsgSync(t, writer, reader)
}
func TestVarintWrite(t *testing.T) {
SubtestVarintWrite(t, []byte("hello world"))
SubtestVarintWrite(t, []byte("hello world hello world hello world"))
SubtestVarintWrite(t, make([]byte, 1<<20))
SubtestVarintWrite(t, []byte(""))
}
func SubtestVarintWrite(t *testing.T, msg []byte) {
buf := bytes.NewBuffer(nil)
writer := NewVarintWriter(buf)
if err := writer.WriteMsg(msg); err != nil {
t.Fatal(err)
}
bb := buf.Bytes()
sbr := simpleByteReader{R: buf}
length, err := varint.ReadUvarint(&sbr)
if err != nil {
t.Fatal(err)
}
t.Logf("checking varint is %d", len(msg))
if int(length) != len(msg) {
t.Fatalf("incorrect varint: %d != %d", length, len(msg))
}
lbuf := make([]byte, binary.MaxVarintLen64)
n := varint.PutUvarint(lbuf, length)
bblen := int(length) + n
t.Logf("checking wrote (%d + %d) bytes", length, n)
if len(bb) != bblen {
t.Fatalf("wrote incorrect number of bytes: %d != %d", len(bb), bblen)
}
}
func TestVarintReadClose(t *testing.T) {
r, w := io.Pipe()
writer := NewVarintWriter(w)
reader := NewVarintReader(r)
SubtestReadClose(t, writer, reader)
}
func TestVarintWriteClose(t *testing.T) {
r, w := io.Pipe()
writer := NewVarintWriter(w)
reader := NewVarintReader(r)
SubtestWriteClose(t, writer, reader)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment