Commit b6815d39 authored by tavit ohanian's avatar tavit ohanian

reference basis

parents 20a6c0ab e2742ecb
Pipeline #677 failed with stages
in 0 seconds
os:
- linux
language: go
go:
- 1.13.x
env:
global:
- GOTFLAGS="-race"
matrix:
- BUILD_DEPTYPE=gomod
# disable travis install
install:
- true
script:
- bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh)
cache:
directories:
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
notifications:
email: false
The MIT License (MIT)
Copyright (c) 2016 Jeromy Johnson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# go-mplex
dms3 p2p go-mplex
\ No newline at end of file
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](https://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
A super simple [stream muxing](https://docs.libp2p.io/concepts/stream-multiplexing/) library implementing [mplex](https://github.com/libp2p/specs/tree/master/mplex).
## Usage
```go
mplex := multiplex.NewMultiplex(mysocket)
s, _ := mplex.NewStream()
s.Write([]byte("Hello World!"))
s.Close()
os, _ := mplex.Accept()
// echo back everything received
io.Copy(os, os)
```
---
The last gx published version of this module was: 0.2.35: QmWGQQ6Tz8AdUpxktLf3zgnVN9Vy8fcWVezZJSU3ZmiANj
package multiplex
import (
"context"
"io"
"math/rand"
"net"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
netbench "github.com/libp2p/go-libp2p-testing/net"
"google.golang.org/grpc/benchmark/latency"
)
func MakeLinearWriteDistribution(b *testing.B) [][]byte {
b.Helper()
n := 1000
itms := make([][]byte, n)
for i := 0; i < n; i++ {
itms[i] = make([]byte, i+1)
}
return itms
}
func MakeSmallPacketDistribution(b *testing.B) [][]byte {
b.Helper()
n := 1000
itms := make([][]byte, n)
i := 0
for ; i < int(float64(n)*(3.0/4.0)); i++ {
itms[i] = make([]byte, 64)
}
for ; i < n; i++ {
itms[i] = make([]byte, 1024)
}
rand.Shuffle(n, func(i, j int) { itms[i], itms[j] = itms[j], itms[i] })
return itms
}
func TestSmallPackets(t *testing.T) {
kbps, lat, err := netbench.FindNetworkLimit(testSmallPackets, 0.5)
if err != nil {
t.Skip()
}
slowdown, err := netbench.ParallelismSlowdown(testSmallPackets, kbps, lat)
if err != nil {
t.Fatal(err)
}
if slowdown > 0.15 && !raceEnabled {
t.Fatalf("Slowdown from mplex was >15%%: %f", slowdown)
}
}
func testSmallPackets(b *testing.B, n1, n2 net.Conn) {
msgs := MakeSmallPacketDistribution(b)
mpa := NewMultiplex(n1, false)
mpb := NewMultiplex(n2, true)
mp := runtime.GOMAXPROCS(0)
runtime.GOMAXPROCS(mp)
streamPairs := make([][]*Stream, 0)
for i := 0; i < mp; i++ {
sa, err := mpa.NewStream(context.Background())
if err != nil {
b.Error(err)
}
sb, err := mpb.Accept()
if err != nil {
b.Error(err)
}
streamPairs = append(streamPairs, []*Stream{sa, sb})
}
receivedBytes := uint64(0)
sentBytes := uint64(0)
idx := int32(0)
b.ResetTimer()
var wg sync.WaitGroup
b.RunParallel(func(pb *testing.PB) {
localIdx := atomic.AddInt32(&idx, 1) - 1
localA := streamPairs[localIdx][0]
localB := streamPairs[localIdx][1]
wg.Add(1)
go func() {
defer wg.Done()
defer localB.Close()
receiveBuf := make([]byte, 2048)
for {
n, err := localB.Read(receiveBuf)
if err != nil && err != io.EOF {
b.Error(err)
}
if n == 0 || err == io.EOF {
return
}
atomic.AddUint64(&receivedBytes, uint64(n))
}
}()
defer localA.Close()
i := 0
for {
n, err := localA.Write(msgs[i])
atomic.AddUint64(&sentBytes, uint64(n))
if err != nil && err != io.EOF {
b.Error(err)
}
i = (i + 1) % 1000
if !pb.Next() {
break
}
}
})
b.StopTimer()
wg.Wait()
if sentBytes != receivedBytes {
b.Fatal("sent != received", sentBytes, receivedBytes)
}
b.SetBytes(int64(receivedBytes))
mpa.Close()
mpb.Close()
}
func BenchmarkSmallPackets(b *testing.B) {
msgs := MakeSmallPacketDistribution(b)
benchmarkPackets(b, msgs)
}
func BenchmarkSlowConnSmallPackets(b *testing.B) {
msgs := MakeSmallPacketDistribution(b)
slowNetwork := latency.Network{
Kbps: 100,
Latency: 30 * time.Millisecond,
MTU: 1500,
}
var wg sync.WaitGroup
wg.Add(1)
var lb net.Conn
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
b.Error(err)
}
slowL := slowNetwork.Listener(l)
go func() {
defer wg.Done()
lb, err = slowL.Accept()
if err != nil {
b.Error(err)
}
}()
dialer := slowNetwork.Dialer(net.Dial)
la, err := dialer("tcp4", slowL.Addr().String())
if err != nil {
b.Error(err)
}
defer la.Close()
wg.Wait()
defer lb.Close()
mpa := NewMultiplex(la, false)
mpb := NewMultiplex(lb, true)
defer mpa.Close()
defer mpb.Close()
benchmarkPacketsWithConn(b, 1, msgs, mpa, mpb)
}
func BenchmarkLinearPackets(b *testing.B) {
msgs := MakeLinearWriteDistribution(b)
benchmarkPackets(b, msgs)
}
func benchmarkPackets(b *testing.B, msgs [][]byte) {
pa, pb := net.Pipe()
defer pa.Close()
defer pb.Close()
mpa := NewMultiplex(pa, false)
mpb := NewMultiplex(pb, true)
defer mpa.Close()
defer mpb.Close()
benchmarkPacketsWithConn(b, 1, msgs, mpa, mpb)
}
func benchmarkPacketsWithConn(b *testing.B, parallelism int, msgs [][]byte, mpa, mpb *Multiplex) {
streamPairs := make([][]*Stream, 0)
for i := 0; i < parallelism*runtime.GOMAXPROCS(0); i++ {
sa, err := mpa.NewStream(context.Background())
if err != nil {
b.Error(err)
}
sb, err := mpb.Accept()
if err != nil {
b.Error(err)
}
streamPairs = append(streamPairs, []*Stream{sa, sb})
}
receivedBytes := uint64(0)
sentBytes := uint64(0)
idx := int32(0)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
localIdx := atomic.AddInt32(&idx, 1) - 1
localA := streamPairs[localIdx][0]
localB := streamPairs[localIdx][1]
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
receiveBuf := make([]byte, 2048)
for {
n, err := localB.Read(receiveBuf)
if err != nil && err != io.EOF {
b.Error(err)
}
if n == 0 || err == io.EOF {
return
}
atomic.AddUint64(&receivedBytes, uint64(n))
}
}()
i := 0
for {
n, err := localA.Write(msgs[i])
atomic.AddUint64(&sentBytes, uint64(n))
if err != nil && err != io.EOF {
b.Error(err)
}
i = (i + 1) % 1000
if !pb.Next() {
break
}
}
localA.Close()
b.StopTimer()
wg.Wait()
})
if sentBytes != receivedBytes {
b.Fatal("sent != received", sentBytes, receivedBytes)
}
b.SetBytes(int64(receivedBytes))
}
coverage:
range: "50...100"
comment: off
// Copied from the go standard library.
//
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE-BSD file.
package multiplex
import (
"sync"
"time"
)
// pipeDeadline is an abstraction for handling timeouts.
type pipeDeadline struct {
mu sync.Mutex // Guards timer and cancel
timer *time.Timer
cancel chan struct{} // Must be non-nil
}
func makePipeDeadline() pipeDeadline {
return pipeDeadline{cancel: make(chan struct{})}
}
// set sets the point in time when the deadline will time out.
// A timeout event is signaled by closing the channel returned by waiter.
// Once a timeout has occurred, the deadline can be refreshed by specifying a
// t value in the future.
//
// A zero value for t prevents timeout.
func (d *pipeDeadline) set(t time.Time) {
d.mu.Lock()
defer d.mu.Unlock()
// deadline closed
if d.cancel == nil {
return
}
if d.timer != nil && !d.timer.Stop() {
<-d.cancel // Wait for the timer callback to finish and close cancel
}
d.timer = nil
// Time is zero, then there is no deadline.
closed := isClosedChan(d.cancel)
if t.IsZero() {
if closed {
d.cancel = make(chan struct{})
}
return
}
// Time in the future, setup a timer to cancel in the future.
if dur := time.Until(t); dur > 0 {
if closed {
d.cancel = make(chan struct{})
}
d.timer = time.AfterFunc(dur, func() {
close(d.cancel)
})
return
}
// Time in the past, so close immediately.
if !closed {
close(d.cancel)
}
}
// wait returns a channel that is closed when the deadline is exceeded.
func (d *pipeDeadline) wait() chan struct{} {
d.mu.Lock()
defer d.mu.Unlock()
return d.cancel
}
// close closes, the deadline. Any future calls to `set` will do nothing.
func (d *pipeDeadline) close() {
d.mu.Lock()
defer d.mu.Unlock()
if d.timer != nil && !d.timer.Stop() {
<-d.cancel // Wait for the timer callback to finish and close cancel
}
d.timer = nil
d.cancel = nil
}
func isClosedChan(c <-chan struct{}) bool {
select {
case <-c:
return true
default:
return false
}
}
module github.com/libp2p/go-mplex
require (
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log/v2 v2.1.1 // indirect
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8
github.com/multiformats/go-varint v0.0.6
github.com/opentracing/opentracing-go v1.2.0 // indirect
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.15.0 // indirect
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 // indirect
google.golang.org/grpc v1.28.1
)
go 1.13
This diff is collapsed.
# Js/Go interop test
This directory contains a basic js/go interop test. To run it, just run
`./test.sh` in this directory. It depends on `npm` and `go`.
package main
import (
"fmt"
"io"
"io/ioutil"
"net"
"sync"
mplex "github.com/libp2p/go-mplex"
)
var jsTestData = "test data from js %d"
var goTestData = "test data from go %d"
func main() {
conn, err := net.Dial("tcp4", "127.0.0.1:9991")
if err != nil {
panic(err)
}
sess := mplex.NewMultiplex(conn, true)
defer sess.Close()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s, err := sess.NewStream()
if err != nil {
panic(err)
}
readWrite(s)
}()
}
for i := 0; i < 100; i++ {
s, err := sess.Accept()
if err != nil {
panic(err)
}
wg.Add(1)
go func() {
defer wg.Done()
readWrite(s)
}()
}
wg.Wait()
}
func readWrite(s *mplex.Stream) {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
_, err := fmt.Fprintf(s, goTestData, i)
if err != nil {
panic(err)
}
}
s.Close()
}()
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
expected := fmt.Sprintf(jsTestData, i)
actual := make([]byte, len(expected))
_, err := io.ReadFull(s, actual)
if err != nil {
panic(err)
}
if expected != string(actual) {
panic("bad bytes")
}
}
buf, err := ioutil.ReadAll(s)
if err != nil {
panic(err)
}
if len(buf) > 0 {
panic("expected EOF")
}
}()
wg.Wait()
}
{
"name": "mplex-test",
"version": "1.0.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"async": {
"version": "2.6.0",
"resolved": "https://registry.npmjs.org/async/-/async-2.6.0.tgz",
"integrity": "sha512-xAfGg1/NTLBBKlHFmnd7PlmUW9KhVQIUuSrYem9xzFUZy13ScvtyGGejaae9iAVRiRq9+Cx7DPFaAAhCpyxyPw==",
"requires": {
"lodash": "^4.14.0"
}
},
"buffer-from": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.0.0.tgz",
"integrity": "sha512-83apNb8KK0Se60UE1+4Ukbe3HbfELJ6UlI4ldtOGs7So4KD26orJM8hIY9lxdzP+UpItH1Yh/Y8GUvNFWFFRxA=="
},
"chunky": {
"version": "0.0.0",
"resolved": "https://registry.npmjs.org/chunky/-/chunky-0.0.0.tgz",
"integrity": "sha1-HnWAojwIOJfSrWYkWefv2EZfYIo="
},
"concat-stream": {
"version": "1.6.2",
"resolved": "https://registry.npmjs.org/concat-stream/-/concat-stream-1.6.2.tgz",
"integrity": "sha512-27HBghJxjiZtIk3Ycvn/4kbJk/1uZuJFfuPEns6LaEvpvG1f0hTea8lilrouyo9mVc2GWdcEZ8OLoGmSADlrCw==",
"requires": {
"buffer-from": "^1.0.0",
"inherits": "^2.0.3",
"readable-stream": "^2.2.2",
"typedarray": "^0.0.6"
}
},
"core-util-is": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz",
"integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac="
},
"debug": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz",
"integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==",
"requires": {
"ms": "2.0.0"
}
},
"duplexify": {
"version": "3.5.4",
"resolved": "https://registry.npmjs.org/duplexify/-/duplexify-3.5.4.tgz",
"integrity": "sha512-JzYSLYMhoVVBe8+mbHQ4KgpvHpm0DZpJuL8PY93Vyv1fW7jYJ90LoXa1di/CVbJM+TgMs91rbDapE/RNIfnJsA==",
"requires": {
"end-of-stream": "^1.0.0",
"inherits": "^2.0.1",
"readable-stream": "^2.0.0",
"stream-shift": "^1.0.0"
}
},
"end-of-stream": {
"version": "1.4.1",
"resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.1.tgz",
"integrity": "sha512-1MkrZNvWTKCaigbn+W15elq2BB/L22nqrSY5DKlo3X6+vclJm8Bb5djXJBmEX6fS3+zCh/F4VBK5Z2KxJt4s2Q==",
"requires": {
"once": "^1.4.0"
}
},
"inherits": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.3.tgz",
"integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4="
},
"interface-connection": {
"version": "0.3.2",
"resolved": "https://registry.npmjs.org/interface-connection/-/interface-connection-0.3.2.tgz",
"integrity": "sha1-5JSYg/bqeft+3QHuP0/KR6Kf0sQ=",
"requires": {
"pull-defer": "~0.2.2",
"timed-tape": "~0.1.1"
}
},
"isarray": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz",
"integrity": "sha1-u5NdSFgsuhaMBoNJV6VKPgcSTxE="
},
"libp2p-mplex": {
"version": "0.7.0",
"resolved": "https://registry.npmjs.org/libp2p-mplex/-/libp2p-mplex-0.7.0.tgz",
"integrity": "sha512-Vvk6ShXrNQBM/fkbrnEMtWuWiKEBXmSMFONtmfu33BKgi6dDzOWOR33LuIv1xEPkLWagRAeckVTg0kTta7J3ZA==",
"requires": {
"async": "^2.6.0",
"chunky": "0.0.0",
"concat-stream": "^1.6.2",
"debug": "^3.1.0",
"duplexify": "^3.5.4",
"pull-catch": "^1.0.0",
"pull-stream": "^3.6.7",
"pull-stream-to-stream": "^1.3.4",
"pump": "^3.0.0",
"readable-stream": "^2.3.6",
"stream-to-pull-stream": "^1.7.2",
"through2": "^2.0.3",
"varint": "^5.0.0"
}
},
"lodash": {
"version": "4.17.19",
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.19.tgz",
"integrity": "sha512-JNvd8XER9GQX0v2qJgsaN/mzFCNA5BRe/j8JN9d+tWyGLSodKQHKFicdwNYzWwI3wjRnaKPsGj1XkBjx/F96DQ=="
},
"looper": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/looper/-/looper-3.0.0.tgz",
"integrity": "sha1-LvpUw7HLq6m5Su4uWRSwvlf7t0k="
},
"ms": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz",
"integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g="
},
"once": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz",
"integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=",
"requires": {
"wrappy": "1"
}
},
"process-nextick-args": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.0.tgz",
"integrity": "sha512-MtEC1TqN0EU5nephaJ4rAtThHtC86dNN9qCuEhtshvpVBkAW5ZO7BASN9REnF9eoXGcRub+pFuKEpOHE+HbEMw=="
},
"pull-catch": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/pull-catch/-/pull-catch-1.0.0.tgz",
"integrity": "sha1-9YA361woLMtQavn3awAn0zkx5Is="
},
"pull-defer": {
"version": "0.2.2",
"resolved": "https://registry.npmjs.org/pull-defer/-/pull-defer-0.2.2.tgz",
"integrity": "sha1-CIew/7MK8ypW2+z6csFnInHwexM="
},
"pull-stream": {
"version": "3.6.7",
"resolved": "https://registry.npmjs.org/pull-stream/-/pull-stream-3.6.7.tgz",
"integrity": "sha512-XdE2/o1I2lK7A+sbbA/HjYnd5Xk7wL5CwAKzqHIgcBsluDb0LiKHNTl1K0it3/RKPshQljLf4kl1aJ12YsCCGQ=="
},
"pull-stream-to-stream": {
"version": "1.3.4",
"resolved": "https://registry.npmjs.org/pull-stream-to-stream/-/pull-stream-to-stream-1.3.4.tgz",
"integrity": "sha1-P4HYIWvRjSv9GhmBkEcRgOJzg5k="
},
"pump": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz",
"integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==",
"requires": {
"end-of-stream": "^1.1.0",
"once": "^1.3.1"
}
},
"readable-stream": {
"version": "2.3.6",
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.3.6.tgz",
"integrity": "sha512-tQtKA9WIAhBF3+VLAseyMqZeBjW0AHJoxOtYqSUZNJxauErmLbVm2FW1y+J/YA9dUrAC39ITejlZWhVIwawkKw==",
"requires": {
"core-util-is": "~1.0.0",
"inherits": "~2.0.3",
"isarray": "~1.0.0",
"process-nextick-args": "~2.0.0",
"safe-buffer": "~5.1.1",
"string_decoder": "~1.1.1",
"util-deprecate": "~1.0.1"
}
},
"safe-buffer": {
"version": "5.1.1",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.1.tgz",
"integrity": "sha512-kKvNJn6Mm93gAczWVJg7wH+wGYWNrDHdWvpUmHyEsgCtIwwo3bqPtV4tR5tuPaUhTOo/kvhVwd8XwwOllGYkbg=="
},
"stream-shift": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.0.tgz",
"integrity": "sha1-1cdSgl5TZ+eG944Y5EXqIjoVWVI="
},
"stream-to-pull-stream": {
"version": "1.7.2",
"resolved": "https://registry.npmjs.org/stream-to-pull-stream/-/stream-to-pull-stream-1.7.2.tgz",
"integrity": "sha1-dXYJrhzr0zx0MtSvvjH/eGULnd4=",
"requires": {
"looper": "^3.0.0",
"pull-stream": "^3.2.3"
}
},
"string_decoder": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz",
"integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==",
"requires": {
"safe-buffer": "~5.1.0"
}
},
"tcp": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/tcp/-/tcp-1.0.0.tgz",
"integrity": "sha1-/HZmWltMFMo5EyVwQ8BwdvqGqn4="
},
"through2": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/through2/-/through2-2.0.3.tgz",
"integrity": "sha1-AARWmzfHx0ujnEPzzteNGtlBQL4=",
"requires": {
"readable-stream": "^2.1.5",
"xtend": "~4.0.1"
}
},
"timed-tape": {
"version": "0.1.1",
"resolved": "https://registry.npmjs.org/timed-tape/-/timed-tape-0.1.1.tgz",
"integrity": "sha1-m25WnxfmbHnx7tLSX/eWL8dBjkk="
},
"typedarray": {
"version": "0.0.6",
"resolved": "https://registry.npmjs.org/typedarray/-/typedarray-0.0.6.tgz",
"integrity": "sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c="
},
"util-deprecate": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz",
"integrity": "sha1-RQ1Nyfpw3nMnYvvS1KKJgUGaDM8="
},
"varint": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/varint/-/varint-5.0.0.tgz",
"integrity": "sha1-2Ca4n3SQcy+rwMDtaT7Uddyynr8="
},
"wrappy": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",
"integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8="
},
"xtend": {
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.1.tgz",
"integrity": "sha1-pcbVMr5lbiPbgg77lDofBJmNY68="
}
}
}
{
"name": "mplex-test",
"version": "1.0.0",
"description": "mplex test program",
"author": "",
"license": "ISC",
"dependencies": {
"interface-connection": "^0.3.2",
"libp2p-mplex": "^0.7.0",
"pull-stream": "^3.6.7",
"stream-to-pull-stream": "^1.7.2",
"tcp": "^1.0.0"
}
}
const assert = require('assert')
const mplex = require('libp2p-mplex')
const toPull = require('stream-to-pull-stream')
const pull = require('pull-stream')
const tcp = require('tcp')
const jsTestData = 'test data from js'
const goTestData = 'test data from go'
function readWrite (stream) {
pull(
stream,
pull.concat((err, data) => {
if (err) {
throw err
}
let offset = 0
for (let i = 0; i < 100; i++) {
let expected = goTestData + ' ' + i
assert.equal(expected, data.slice(offset, offset + expected.length))
offset += expected.length
}
})
)
pull(
pull.count(99),
pull.map((i) => jsTestData + ' ' + i),
stream
)
}
const listener = tcp.createServer((socket) => {
let muxer = mplex.listener(toPull.duplex(socket))
muxer.on('stream', (stream) => {
readWrite(stream)
})
for (let i = 0; i < 100; i++) {
muxer.newStream((err, stream) => {
if (err) {
throw err
}
readWrite(stream)
})
}
socket.on('close', () => {
listener.close()
})
})
listener.listen(9991)
#!/bin/sh
(
cd "js"
npm install
)
(
cd "js" && npm start
) &
sleep 1
(
cd "go" && go run main.go
) &
wait
package multiplex
import (
"bufio"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"sync"
"time"
logging "github.com/ipfs/go-log"
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-varint"
)
var log = logging.Logger("mplex")
var MaxMessageSize = 1 << 20
// Max time to block waiting for a slow reader to read from a stream before
// resetting it. Preferably, we'd have some form of back-pressure mechanism but
// we don't have that in this protocol.
var ReceiveTimeout = 5 * time.Second
// ErrShutdown is returned when operating on a shutdown session
var ErrShutdown = errors.New("session shut down")
// ErrTwoInitiators is returned when both sides think they're the initiator
var ErrTwoInitiators = errors.New("two initiators")
// ErrInvalidState is returned when the other side does something it shouldn't.
// In this case, we close the connection to be safe.
var ErrInvalidState = errors.New("received an unexpected message from the peer")
var errTimeout = timeout{}
var errStreamClosed = errors.New("stream closed")
var (
ResetStreamTimeout = 2 * time.Minute
WriteCoalesceDelay = 100 * time.Microsecond
)
type timeout struct{}
func (_ timeout) Error() string {
return "i/o deadline exceeded"
}
func (_ timeout) Temporary() bool {
return true
}
func (_ timeout) Timeout() bool {
return true
}
// +1 for initiator
const (
newStreamTag = 0
messageTag = 2
closeTag = 4
resetTag = 6
)
// Multiplex is a mplex session.
type Multiplex struct {
con net.Conn
buf *bufio.Reader
nextID uint64
initiator bool
closed chan struct{}
shutdown chan struct{}
shutdownErr error
shutdownLock sync.Mutex
writeCh chan []byte
writeTimer *time.Timer
writeTimerFired bool
nstreams chan *Stream
channels map[streamID]*Stream
chLock sync.Mutex
}
// NewMultiplex creates a new multiplexer session.
func NewMultiplex(con net.Conn, initiator bool) *Multiplex {
mp := &Multiplex{
con: con,
initiator: initiator,
buf: bufio.NewReader(con),
channels: make(map[streamID]*Stream),
closed: make(chan struct{}),
shutdown: make(chan struct{}),
writeCh: make(chan []byte, 16),
writeTimer: time.NewTimer(0),
nstreams: make(chan *Stream, 16),
}
go mp.handleIncoming()
go mp.handleOutgoing()
return mp
}
func (mp *Multiplex) newStream(id streamID, name string) (s *Stream) {
s = &Stream{
id: id,
name: name,
dataIn: make(chan []byte, 8),
rDeadline: makePipeDeadline(),
wDeadline: makePipeDeadline(),
mp: mp,
writeCancel: make(chan struct{}),
readCancel: make(chan struct{}),
}
return
}
// Accept accepts the next stream from the connection.
func (m *Multiplex) Accept() (*Stream, error) {
select {
case s, ok := <-m.nstreams:
if !ok {
return nil, errors.New("multiplex closed")
}
return s, nil
case <-m.closed:
return nil, m.shutdownErr
}
}
// Close closes the session.
func (mp *Multiplex) Close() error {
mp.closeNoWait()
// Wait for the receive loop to finish.
<-mp.closed
return nil
}
func (mp *Multiplex) closeNoWait() {
mp.shutdownLock.Lock()
select {
case <-mp.shutdown:
default:
mp.con.Close()
close(mp.shutdown)
}
mp.shutdownLock.Unlock()
}
// IsClosed returns true if the session is closed.
func (mp *Multiplex) IsClosed() bool {
select {
case <-mp.closed:
return true
default:
return false
}
}
func (mp *Multiplex) sendMsg(timeout, cancel <-chan struct{}, header uint64, data []byte) error {
buf := pool.Get(len(data) + 20)
n := 0
n += binary.PutUvarint(buf[n:], header)
n += binary.PutUvarint(buf[n:], uint64(len(data)))
n += copy(buf[n:], data)
select {
case mp.writeCh <- buf[:n]:
return nil
case <-mp.shutdown:
return ErrShutdown
case <-timeout:
return errTimeout
case <-cancel:
return ErrStreamClosed
}
}
func (mp *Multiplex) handleOutgoing() {
for {
select {
case <-mp.shutdown:
return
case data := <-mp.writeCh:
// FIXME: https://github.com/libp2p/go-libp2p/issues/644
// write coalescing disabled until this can be fixed.
//err := mp.writeMsg(data)
err := mp.doWriteMsg(data)
pool.Put(data)
if err != nil {
// the connection is closed by this time
log.Warnf("error writing data: %s", err.Error())
return
}
}
}
}
func (mp *Multiplex) writeMsg(data []byte) error {
if len(data) >= 512 {
err := mp.doWriteMsg(data)
pool.Put(data)
return err
}
buf := pool.Get(4096)
defer pool.Put(buf)
n := copy(buf, data)
pool.Put(data)
if !mp.writeTimerFired {
if !mp.writeTimer.Stop() {
<-mp.writeTimer.C
}
}
mp.writeTimer.Reset(WriteCoalesceDelay)
mp.writeTimerFired = false
for {
select {
case data = <-mp.writeCh:
wr := copy(buf[n:], data)
if wr < len(data) {
// we filled the buffer, send it
err := mp.doWriteMsg(buf)
if err != nil {
pool.Put(data)
return err
}
if len(data)-wr >= 512 {
// the remaining data is not a small write, send it
err := mp.doWriteMsg(data[wr:])
pool.Put(data)
return err
}
n = copy(buf, data[wr:])
// we've written some, reset the timer to coalesce the rest
if !mp.writeTimer.Stop() {
<-mp.writeTimer.C
}
mp.writeTimer.Reset(WriteCoalesceDelay)
} else {
n += wr
}
pool.Put(data)
case <-mp.writeTimer.C:
mp.writeTimerFired = true
return mp.doWriteMsg(buf[:n])
case <-mp.shutdown:
return ErrShutdown
}
}
}
func (mp *Multiplex) doWriteMsg(data []byte) error {
if mp.isShutdown() {
return ErrShutdown
}
_, err := mp.con.Write(data)
if err != nil {
mp.closeNoWait()
}
return err
}
func (mp *Multiplex) nextChanID() uint64 {
out := mp.nextID
mp.nextID++
return out
}
// NewStream creates a new stream.
func (mp *Multiplex) NewStream(ctx context.Context) (*Stream, error) {
return mp.NewNamedStream(ctx, "")
}
// NewNamedStream creates a new named stream.
func (mp *Multiplex) NewNamedStream(ctx context.Context, name string) (*Stream, error) {
mp.chLock.Lock()
// We could call IsClosed but this is faster (given that we already have
// the lock).
if mp.channels == nil {
mp.chLock.Unlock()
return nil, ErrShutdown
}
sid := mp.nextChanID()
header := (sid << 3) | newStreamTag
if name == "" {
name = fmt.Sprint(sid)
}
s := mp.newStream(streamID{
id: sid,
initiator: true,
}, name)
mp.channels[s.id] = s
mp.chLock.Unlock()
err := mp.sendMsg(ctx.Done(), nil, header, []byte(name))
if err != nil {
if err == errTimeout {
return nil, ctx.Err()
}
return nil, err
}
return s, nil
}
func (mp *Multiplex) cleanup() {
mp.closeNoWait()
// Take the channels.
mp.chLock.Lock()
channels := mp.channels
mp.channels = nil
mp.chLock.Unlock()
// Cancel any reads/writes
for _, msch := range channels {
msch.cancelRead(ErrStreamReset)
msch.cancelWrite(ErrStreamReset)
}
// And... shutdown!
if mp.shutdownErr == nil {
mp.shutdownErr = ErrShutdown
}
close(mp.closed)
}
func (mp *Multiplex) handleIncoming() {
defer mp.cleanup()
recvTimeout := time.NewTimer(0)
defer recvTimeout.Stop()
if !recvTimeout.Stop() {
<-recvTimeout.C
}
for {
chID, tag, err := mp.readNextHeader()
if err != nil {
mp.shutdownErr = err
return
}
remoteIsInitiator := tag&1 == 0
ch := streamID{
// true if *I'm* the initiator.
initiator: !remoteIsInitiator,
id: chID,
}
// Rounds up the tag:
// 0 -> 0
// 1 -> 2
// 2 -> 2
// 3 -> 4
// etc...
tag += (tag & 1)
b, err := mp.readNext()
if err != nil {
mp.shutdownErr = err
return
}
mp.chLock.Lock()
msch, ok := mp.channels[ch]
mp.chLock.Unlock()
switch tag {
case newStreamTag:
if ok {
log.Debugf("received NewStream message for existing stream: %d", ch)
mp.shutdownErr = ErrInvalidState
return
}
name := string(b)
pool.Put(b)
msch = mp.newStream(ch, name)
mp.chLock.Lock()
mp.channels[ch] = msch
mp.chLock.Unlock()
select {
case mp.nstreams <- msch:
case <-mp.shutdown:
return
}
case resetTag:
if !ok {
// This is *ok*. We forget the stream on reset.
continue
}
// Cancel any ongoing reads/writes.
msch.cancelRead(ErrStreamReset)
msch.cancelWrite(ErrStreamReset)
case closeTag:
if !ok {
// may have canceled our reads already.
continue
}
// unregister and throw away future data.
mp.chLock.Lock()
delete(mp.channels, ch)
mp.chLock.Unlock()
// close data channel, there will be no more data.
close(msch.dataIn)
// We intentionally don't cancel any deadlines, cancel reads, cancel
// writes, etc. We just deliver the EOF by closing the
// data channel, and unregister the channel so we don't
// receive any more data. The user still needs to call
// `Close()` or `Reset()`.
case messageTag:
if !ok {
// We're not accepting data on this stream, for
// some reason. It's likely that we reset it, or
// simply canceled reads (e.g., called Close).
pool.Put(b)
continue
}
recvTimeout.Reset(ReceiveTimeout)
select {
case msch.dataIn <- b:
case <-msch.readCancel:
// the user has canceled reading. walk away.
pool.Put(b)
case <-recvTimeout.C:
pool.Put(b)
log.Warnf("timed out receiving message into stream queue.")
// Do not do this asynchronously. Otherwise, we
// could drop a message, then receive a message,
// then reset.
msch.Reset()
continue
case <-mp.shutdown:
pool.Put(b)
return
}
if !recvTimeout.Stop() {
<-recvTimeout.C
}
default:
log.Debugf("message with unknown header on stream %s", ch)
if ok {
msch.Reset()
}
}
}
}
func (mp *Multiplex) isShutdown() bool {
select {
case <-mp.shutdown:
return true
default:
return false
}
}
func (mp *Multiplex) sendResetMsg(header uint64, hard bool) {
ctx, cancel := context.WithTimeout(context.Background(), ResetStreamTimeout)
defer cancel()
err := mp.sendMsg(ctx.Done(), nil, header, nil)
if err != nil && !mp.isShutdown() {
if hard {
log.Warnf("error sending reset message: %s; killing connection", err.Error())
mp.Close()
} else {
log.Debugf("error sending reset message: %s", err.Error())
}
}
}
func (mp *Multiplex) readNextHeader() (uint64, uint64, error) {
h, err := varint.ReadUvarint(mp.buf)
if err != nil {
return 0, 0, err
}
// get channel ID
ch := h >> 3
rem := h & 7
return ch, rem, nil
}
func (mp *Multiplex) readNext() ([]byte, error) {
// get length
l, err := varint.ReadUvarint(mp.buf)
if err != nil {
return nil, err
}
if l > uint64(MaxMessageSize) {
return nil, fmt.Errorf("message size too large!")
}
if l == 0 {
return nil, nil
}
buf := pool.Get(int(l))
n, err := io.ReadFull(mp.buf, buf)
if err != nil {
return nil, err
}
return buf[:n], nil
}
func isFatalNetworkError(err error) bool {
nerr, ok := err.(net.Error)
if ok {
return !(nerr.Timeout() || nerr.Temporary())
}
return false
}
package multiplex
import (
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"os"
"sync"
"testing"
"time"
)
func init() {
// Let's not slow down the tests too much...
ReceiveTimeout = 100 * time.Millisecond
}
func TestSlowReader(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
defer mpa.Close()
defer mpb.Close()
mes := []byte("Hello world")
sa, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
sb, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
// 100 is large enough that the buffer of the underlying connection will
// fill up.
for i := 0; i < 10000; i++ {
_, err = sa.Write(mes)
if err != nil {
break
}
}
if err == nil {
t.Fatal("wrote too many messages")
}
// There's a race between reading this stream and processing the reset
// so we have to read enough off to drain the queue.
for i := 0; i < 8; i++ {
_, err = sb.Read(mes)
if err != nil {
return
}
}
t.Fatal("stream should have been reset")
}
func TestBasicStreams(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
mes := []byte("Hello world")
go func() {
s, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
_, err = s.Write(mes)
if err != nil {
t.Fatal(err)
}
err = s.Close()
if err != nil {
t.Fatal(err)
}
}()
s, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
buf, err := ioutil.ReadAll(s)
if err != nil {
t.Fatal(err)
}
if string(buf) != string(mes) {
t.Fatal("got bad data")
}
s.Close()
mpa.Close()
mpb.Close()
}
func TestOpenStreamDeadline(t *testing.T) {
a, _ := net.Pipe()
mp := NewMultiplex(a, false)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
var counter int
var deadlineExceeded bool
for i := 0; i < 1000; i++ {
if _, err := mp.NewStream(ctx); err != nil {
if err != context.DeadlineExceeded {
t.Fatalf("expected the error to be a deadline error, got %s", err.Error())
}
deadlineExceeded = true
break
}
counter++
}
if counter == 0 {
t.Fatal("expected at least some streams to open successfully")
}
if !deadlineExceeded {
t.Fatal("expected a deadline error to occur at some point")
}
}
func TestWriteAfterClose(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
done := make(chan struct{})
mes := []byte("Hello world")
go func() {
s, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
_, err = s.Write(mes)
if err != nil {
return
}
_, err = s.Write(mes)
if err != nil {
return
}
s.Close()
close(done)
}()
s, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
defer s.Close()
// wait for writes to complete and close to happen (and be noticed)
<-done
time.Sleep(time.Millisecond * 50)
buf := make([]byte, len(mes)*10)
n, _ := io.ReadFull(s, buf)
if n != len(mes)*2 {
t.Fatal("read incorrect amount of data: ", n)
}
// read after close should fail with EOF
_, err = s.Read(buf)
if err == nil {
t.Fatal("read on closed stream should fail")
}
mpa.Close()
mpb.Close()
}
func TestEcho(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
mes := make([]byte, 40960)
rand.Read(mes)
go func() {
s, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
defer s.Close()
io.Copy(s, s)
}()
s, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
_, err = s.Write(mes)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, len(mes))
n, err := io.ReadFull(s, buf)
if err != nil {
t.Fatal(err)
}
if n != len(mes) {
t.Fatal("read wrong amount")
}
if err := arrComp(buf, mes); err != nil {
t.Fatal(err)
}
s.Close()
mpa.Close()
mpb.Close()
}
func TestFullClose(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
mes := make([]byte, 40960)
rand.Read(mes)
s, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
{
s2, err := mpb.Accept()
if err != nil {
t.Error(err)
}
_, err = s.Write(mes)
if err != nil {
t.Error(err)
}
s2.Close()
}
err = s.Close()
if err != nil {
t.Fatal(err)
}
if n, err := s.Write([]byte("foo")); err != ErrStreamClosed {
t.Fatal("expected stream closed error on write to closed stream, got", err)
} else if n != 0 {
t.Fatal("should not have written any bytes to closed stream")
}
// We closed for reading, this should fail.
if n, err := s.Read([]byte{0}); err != ErrStreamClosed {
t.Fatal("expected stream closed error on read from closed stream, got", err)
} else if n != 0 {
t.Fatal("should not have read any bytes from closed stream, got", n)
}
mpa.Close()
mpb.Close()
}
func TestHalfClose(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
wait := make(chan struct{})
mes := make([]byte, 40960)
rand.Read(mes)
go func() {
s, err := mpb.Accept()
if err != nil {
t.Error(err)
}
defer s.Close()
if err := s.CloseRead(); err != nil {
t.Error(err)
}
<-wait
_, err = s.Write(mes)
if err != nil {
t.Error(err)
}
}()
s, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
defer s.Close()
err = s.CloseWrite()
if err != nil {
t.Fatal(err)
}
bn, err := s.Write([]byte("foo"))
if err == nil {
t.Fatal("expected error on write to closed stream")
}
if bn != 0 {
t.Fatal("should not have written any bytes to closed stream")
}
close(wait)
buf, err := ioutil.ReadAll(s)
if err != nil {
t.Fatal(err)
}
if len(buf) != len(mes) {
t.Fatal("read wrong amount", len(buf), len(mes))
}
if err := arrComp(buf, mes); err != nil {
t.Fatal(err)
}
mpa.Close()
mpb.Close()
}
func TestFuzzCloseConnection(t *testing.T) {
a, b := net.Pipe()
for i := 0; i < 1000; i++ {
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
go mpa.Close()
go mpa.Close()
mpb.Close()
}
}
func TestClosing(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
_, err := mpb.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
_, err = mpa.Accept()
if err != nil {
t.Fatal(err)
}
err = mpa.Close()
if err != nil {
t.Fatal(err)
}
err = mpb.Close()
if err != nil {
// not an error, the other side is closing the pipe/socket
t.Log(err)
}
if len(mpa.channels) > 0 || len(mpb.channels) > 0 {
t.Fatal("leaked closed streams")
}
}
func TestReset(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
defer mpa.Close()
defer mpb.Close()
sa, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
sb, err := mpb.Accept()
buf := make([]byte, 10)
sa.Reset()
n, err := sa.Read(buf)
if n != 0 {
t.Fatalf("read %d bytes on reset stream", n)
}
if err == nil {
t.Fatalf("successfully read from reset stream")
}
n, err = sa.Write([]byte("test"))
if n != 0 {
t.Fatalf("wrote %d bytes on reset stream", n)
}
if err == nil {
t.Fatalf("successfully wrote to reset stream")
}
time.Sleep(200 * time.Millisecond)
n, err = sb.Write([]byte("test"))
if n != 0 {
t.Fatalf("wrote %d bytes on reset stream", n)
}
if err == nil {
t.Fatalf("successfully wrote to reset stream")
}
n, err = sb.Read(buf)
if n != 0 {
t.Fatalf("read %d bytes on reset stream", n)
}
if err == nil {
t.Fatalf("successfully read from reset stream")
}
}
func TestCancelRead(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
defer mpa.Close()
defer mpb.Close()
sa, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
defer sa.Reset()
sb, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
defer sb.Reset()
// spin off a read
done := make(chan struct{})
go func() {
defer close(done)
_, err := sa.Read([]byte{0})
if err != ErrStreamClosed {
t.Error(err)
}
}()
// give it a chance to start.
time.Sleep(time.Millisecond)
// cancel it.
err = sa.CloseRead()
if err != nil {
t.Fatal(err)
}
// It should be canceled.
<-done
// Writing should still succeed.
_, err = sa.Write([]byte("foo"))
if err != nil {
t.Fatal(err)
}
err = sa.Close()
if err != nil {
t.Fatal(err)
}
// Data should still be sent.
buf, err := ioutil.ReadAll(sb)
if err != nil {
t.Fatal(err)
}
if string(buf) != "foo" {
t.Fatalf("expected foo, got %#v", err)
}
}
func TestCancelWrite(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
defer mpa.Close()
defer mpb.Close()
sa, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
defer sa.Reset()
sb, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
defer sb.Reset()
// spin off a read
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for {
_, err := sa.Write([]byte("foo"))
if err != nil {
if err != ErrStreamClosed {
t.Error("unexpected error", err)
}
return
}
}
}()
// give it a chance to fill up.
time.Sleep(time.Millisecond)
go func() {
defer wg.Done()
// close it.
err := sa.CloseWrite()
if err != nil {
t.Error(err)
}
}()
_, err = ioutil.ReadAll(sb)
if err != nil {
t.Fatalf("expected stream to be closed correctly")
}
// It should be canceled.
wg.Wait()
// Reading should still succeed.
_, err = sb.Write([]byte("bar"))
if err != nil {
t.Fatal(err)
}
err = sb.Close()
if err != nil {
t.Fatal(err)
}
// Data should still be sent.
buf, err := ioutil.ReadAll(sa)
if err != nil {
t.Fatal(err)
}
if string(buf) != "bar" {
t.Fatalf("expected foo, got %#v", err)
}
}
func TestCancelReadAfterWrite(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
defer mpa.Close()
defer mpb.Close()
sa, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
defer sa.Reset()
sb, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
defer sb.Reset()
// Write small messages till we would block.
sa.SetWriteDeadline(time.Now().Add(time.Millisecond))
for {
_, err = sa.Write([]byte("foo"))
if err != nil {
if os.IsTimeout(err) {
break
} else {
t.Fatal(err)
}
}
}
// Cancel inbound reads.
sb.CloseRead()
// We shouldn't read anything.
n, err := sb.Read([]byte{0})
if n != 0 || err != ErrStreamClosed {
t.Fatal("got data", err)
}
}
func TestResetAfterEOF(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
defer mpa.Close()
defer mpb.Close()
sa, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
sb, err := mpb.Accept()
if err := sa.CloseWrite(); err != nil {
t.Fatal(err)
}
n, err := sb.Read([]byte{0})
if n != 0 || err != io.EOF {
t.Fatal(err)
}
sb.Reset()
n, err = sa.Read([]byte{0})
if n != 0 || err != ErrStreamReset {
t.Fatal(err)
}
}
func TestOpenAfterClose(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
sa, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
sb, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
sa.Close()
sb.Close()
mpa.Close()
s, err := mpa.NewStream(context.Background())
if err == nil || s != nil {
t.Fatal("opened a stream on a closed connection")
}
s, err = mpa.NewStream(context.Background())
if err == nil || s != nil {
t.Fatal("opened a stream on a closed connection")
}
mpb.Close()
}
func TestDeadline(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
defer mpa.Close()
defer mpb.Close()
sa, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
_, err = mpb.Accept()
if err != nil {
t.Fatal(err)
}
sa.SetDeadline(time.Now().Add(time.Second))
_, err = sa.Read(make([]byte, 1024))
if err != errTimeout {
t.Fatal("expected timeout")
}
}
func TestReadAfterClose(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
defer mpa.Close()
defer mpb.Close()
sa, err := mpa.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
sb, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
sa.Close()
_, err = sb.Read(make([]byte, 1024))
if err != io.EOF {
t.Fatal("expected EOF")
}
}
func TestFuzzCloseStream(t *testing.T) {
timer := time.AfterFunc(10*time.Second, func() {
// This is really the *only* reliable way to set a timeout on
// this test...
// Don't add *anything* to this function. The go scheduler acts
// a bit funny when it encounters a deadlock...
panic("timeout")
})
defer timer.Stop()
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
defer mpa.Close()
defer mpb.Close()
done := make(chan struct{})
go func() {
streams := make([]*Stream, 100)
for i := range streams {
var err error
streams[i], err = mpb.NewStream(context.Background())
if err != nil {
t.Fatal(err)
}
go streams[i].Close()
go streams[i].Close()
}
// Make sure they're closed before we move on.
for _, s := range streams {
if s == nil {
continue
}
s.Close()
}
close(done)
}()
streams := make([]*Stream, 100)
for i := range streams {
var err error
streams[i], err = mpa.Accept()
if err != nil {
t.Fatal(err)
}
}
<-done
for _, s := range streams {
if s == nil {
continue
}
err := s.Close()
if err != nil {
t.Fatal(err)
}
}
time.Sleep(10 * time.Millisecond)
nchannels := 0
mpa.chLock.Lock()
nchannels += len(mpa.channels)
mpa.chLock.Unlock()
mpb.chLock.Lock()
nchannels += len(mpb.channels)
mpb.chLock.Unlock()
if nchannels > 0 {
t.Fatal("leaked closed streams")
}
}
func arrComp(a, b []byte) error {
msg := ""
if len(a) != len(b) {
msg += fmt.Sprintf("arrays differ in length: %d %d\n", len(a), len(b))
}
for i := 0; i < len(a) && i < len(b); i++ {
if a[i] != b[i] {
msg += fmt.Sprintf("content differs at index %d [%d != %d]", i, a[i], b[i])
return fmt.Errorf(msg)
}
}
if len(msg) > 0 {
return fmt.Errorf(msg)
}
return nil
}
//+build !race
package multiplex
var raceEnabled = false
//+build race
package multiplex
var raceEnabled = true
package multiplex
import (
"context"
"errors"
"io"
"sync"
"time"
pool "github.com/libp2p/go-buffer-pool"
"go.uber.org/multierr"
)
var (
ErrStreamReset = errors.New("stream reset")
ErrStreamClosed = errors.New("closed stream")
)
// streamID is a convenience type for operating on stream IDs
type streamID struct {
id uint64
initiator bool
}
// header computes the header for the given tag
func (id *streamID) header(tag uint64) uint64 {
header := id.id<<3 | tag
if !id.initiator {
header--
}
return header
}
type Stream struct {
id streamID
name string
dataIn chan []byte
mp *Multiplex
extra []byte
// exbuf is for holding the reference to the beginning of the extra slice
// for later memory pool freeing
exbuf []byte
rDeadline, wDeadline pipeDeadline
clLock sync.Mutex
writeCancelErr, readCancelErr error
writeCancel, readCancel chan struct{}
}
func (s *Stream) Name() string {
return s.name
}
// tries to preload pending data
func (s *Stream) preloadData() {
select {
case read, ok := <-s.dataIn:
if !ok {
return
}
s.extra = read
s.exbuf = read
default:
}
}
func (s *Stream) waitForData() error {
select {
case read, ok := <-s.dataIn:
if !ok {
return io.EOF
}
s.extra = read
s.exbuf = read
return nil
case <-s.readCancel:
// This is the only place where it's safe to return these.
s.returnBuffers()
return s.readCancelErr
case <-s.rDeadline.wait():
return errTimeout
}
}
func (s *Stream) returnBuffers() {
if s.exbuf != nil {
pool.Put(s.exbuf)
s.exbuf = nil
s.extra = nil
}
for {
select {
case read, ok := <-s.dataIn:
if !ok {
return
}
if read == nil {
continue
}
pool.Put(read)
default:
return
}
}
}
func (s *Stream) Read(b []byte) (int, error) {
select {
case <-s.readCancel:
return 0, s.readCancelErr
default:
}
if s.extra == nil {
err := s.waitForData()
if err != nil {
return 0, err
}
}
n := 0
for s.extra != nil && n < len(b) {
read := copy(b[n:], s.extra)
n += read
if read < len(s.extra) {
s.extra = s.extra[read:]
} else {
if s.exbuf != nil {
pool.Put(s.exbuf)
}
s.extra = nil
s.exbuf = nil
s.preloadData()
}
}
return n, nil
}
func (s *Stream) Write(b []byte) (int, error) {
var written int
for written < len(b) {
wl := len(b) - written
if wl > MaxMessageSize {
wl = MaxMessageSize
}
n, err := s.write(b[written : written+wl])
if err != nil {
return written, err
}
written += n
}
return written, nil
}
func (s *Stream) write(b []byte) (int, error) {
select {
case <-s.writeCancel:
return 0, s.writeCancelErr
default:
}
err := s.mp.sendMsg(s.wDeadline.wait(), s.writeCancel, s.id.header(messageTag), b)
if err != nil {
return 0, err
}
return len(b), nil
}
func (s *Stream) cancelWrite(err error) bool {
s.wDeadline.close()
s.clLock.Lock()
defer s.clLock.Unlock()
select {
case <-s.writeCancel:
return false
default:
s.writeCancelErr = err
close(s.writeCancel)
return true
}
}
func (s *Stream) cancelRead(err error) bool {
// Always unregister for reading first, even if we're already closed (or
// already closing). When handleIncoming calls this, it expects the
// stream to be unregistered by the time it returns.
s.mp.chLock.Lock()
delete(s.mp.channels, s.id)
s.mp.chLock.Unlock()
s.rDeadline.close()
s.clLock.Lock()
defer s.clLock.Unlock()
select {
case <-s.readCancel:
return false
default:
s.readCancelErr = err
close(s.readCancel)
return true
}
}
func (s *Stream) CloseWrite() error {
if !s.cancelWrite(ErrStreamClosed) {
// Check if we closed the stream _nicely_. If so, we don't need
// to report an error to the user.
if s.writeCancelErr == ErrStreamClosed {
return nil
}
// Closed for some other reason. Report it.
return s.writeCancelErr
}
ctx, cancel := context.WithTimeout(context.Background(), ResetStreamTimeout)
defer cancel()
err := s.mp.sendMsg(ctx.Done(), nil, s.id.header(closeTag), nil)
// We failed to close the stream after 2 minutes, something is probably wrong.
if err != nil && !s.mp.isShutdown() {
log.Warnf("Error closing stream: %s; killing connection", err.Error())
s.mp.Close()
}
return err
}
func (s *Stream) CloseRead() error {
s.cancelRead(ErrStreamClosed)
return nil
}
func (s *Stream) Close() error {
return multierr.Combine(s.CloseRead(), s.CloseWrite())
}
func (s *Stream) Reset() error {
s.cancelRead(ErrStreamReset)
if s.cancelWrite(ErrStreamReset) {
// Send a reset in the background.
go s.mp.sendResetMsg(s.id.header(resetTag), true)
}
return nil
}
func (s *Stream) SetDeadline(t time.Time) error {
s.rDeadline.set(t)
s.wDeadline.set(t)
return nil
}
func (s *Stream) SetReadDeadline(t time.Time) error {
s.rDeadline.set(t)
return nil
}
func (s *Stream) SetWriteDeadline(t time.Time) error {
s.wDeadline.set(t)
return nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment