Commit 4615983a authored by tavit ohanian's avatar tavit ohanian

reference basis

parents aedb0a80 e4941fa6
Pipeline #507 failed with stages
in 0 seconds
/tools/bin/
os:
- linux
# Don't use the go language tag. Install manually to fix wasm.
env:
global:
- GOTFLAGS="-race"
matrix:
- BUILD_DEPTYPE=gomod
# disable travis install
install:
# Manually download and install Go 1.13 instead of using gimme.
# It looks like gimme Go causes some errors on go-test for Wasm.
- wget -O go.tar.gz https://dl.google.com/go/go1.15.3.linux-amd64.tar.gz
- tar -C ~ -xzf go.tar.gz
- rm go.tar.gz
- export GOROOT=~/go
- export PATH=$GOROOT/bin:$PATH
- go version
- go env
script:
- export GOROOT=~/go
- export PATH=$GOROOT/bin:$PATH
- bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh)
cache:
directories:
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
notifications:
email: false
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
The MIT License (MIT)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# go-ws-transport
dms3 p2p go-ws-transport
\ No newline at end of file
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](https://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
[![GoDoc](https://godoc.org/github.com/libp2p/go-ws-transport?status.svg)](https://godoc.org/github.com/libp2p/go-ws-transport)
[![Coverage Status](https://coveralls.io/repos/github/libp2p/go-ws-transport/badge.svg?branch=master)](https://coveralls.io/github/libp2p/go-ws-transport?branch=master)
[![Build Status](https://travis-ci.org/libp2p/go-ws-transport.svg?branch=master)](https://travis-ci.org/libp2p/go-ws-transport)
> A libp2p transport implementation using WebSockets
`go-ws-transport` is an implementation of the [libp2p transport
interface][concept-transport] that streams data over
[WebSockets][spec-websockets], which are themselves layered over TCP/IP. It is
included by default in the main [`go-libp2p`][go-libp2p] "entry point" module.
## Table of Contents
- [go-ws-transport](#go-ws-transport)
- [Table of Contents](#table-of-contents)
- [Install](#install)
- [Usage](#usage)
- [Addresses](#addresses)
- [Security and Multiplexing](#security-and-multiplexing)
- [Contribute](#contribute)
- [Want to hack on IPFS?](#want-to-hack-on-ipfs)
- [License](#license)
## Install
`go-ws-transport` is included as a dependency of `go-libp2p`, which is the most
common libp2p entry point. If you depend on `go-libp2p`, there is generally no
need to explicitly depend on this module.
`go-ws-transport` is a standard Go module which can be installed with:
```sh
> go get github.com/libp2p/go-ws-transport
```
This repo is [gomod](https://github.com/golang/go/wiki/Modules)-compatible, and users of
go 1.11 and later with modules enabled will automatically pull the latest tagged release
by referencing this package. Upgrades to future releases can be managed using `go get`,
or by editing your `go.mod` file as [described by the gomod documentation](https://github.com/golang/go/wiki/Modules#how-to-upgrade-and-downgrade-dependencies).
## Usage
WebSockets are one of the default transports enabled when constructing a standard libp2p
Host, along with [TCP](https://github.com/libp2p/go-tcp-transport).
Calling [`libp2p.New`][godoc-libp2p-new] to construct a libp2p Host will enable
the WebSocket transport, unless you override the default transports by passing in
`Options` to `libp2p.New`.
To explicitly enable the WebSocket transport while constructing a host, use the
`libp2p.Transport` option, passing in the `ws.New` constructor function:
``` go
import (
"context"
libp2p "github.com/libp2p/go-libp2p"
ws "github.com/libp2p/go-ws-transport"
)
ctx := context.Background()
// WebSockets only:
h, err := libp2p.New(ctx,
libp2p.Transport(ws.New)
)
```
The example above will replace the default transports with a single WebSocket
transport. To add multiple tranports, use `ChainOptions`:
``` go
// WebSockets and QUIC:
h, err := libp2p.New(ctx,
libp2p.ChainOptions(
libp2p.Transport(ws.New),
libp2p.Transport(quic.NewTransport)) // see https://github.com/libp2p/go-libp2p-quic-transport
)
```
## Addresses
The WebSocket transport supports [multiaddrs][multiaddr] that contain a `ws`
component, which is encapsulated within (or layered onto) another valid TCP
multiaddr.
Examples:
| addr | description |
|-------------------------------|----------------------------------------------------|
| `/ip4/1.2.3.4/tcp/1234/ws` | IPv4: 1.2.3.4, TCP port 1234 |
| `/ip6/::1/tcp/1234/ws` | IPv6 loopback, TCP port 1234 |
| `/dns4/example.com/tcp/80/ws` | DNS over IPv4, hostname `example.com`, TCP port 80 |
Notice that the `/ws` multiaddr component contextualizes an existing TCP/IP
multiaddr and does not require any additional addressing information.
## Security and Multiplexing
While the WebSocket spec defines a `wss` URI scheme for encrypted WebSocket
connections, support for `wss` URIs relies on TLS, which wraps the WebSocket
connection in a similar manner to TLS-protected HTTP traffic.
As libp2p does not integrate with the TLS Certificate Authority infrastructure
by design, security for WebSockets is provided by a [transport
upgrader][transport-upgrader]. The transport upgrader negotiates transport
security for each connection according to the protocols supported by each party.
The transport upgrader also negotiates a stream multiplexing protocol to allow
many bidirectional streams to coexist on a single WebSocket connection.
## Contribute
Feel free to join in. All welcome. Open an [issue](https://github.com/libp2p/go-ws-transport/issues)!
This repository falls under the libp2p [Code of Conduct](https://github.com/libp2p/community/blob/master/code-of-conduct.md).
### Want to hack on libp2p?
[![](https://cdn.rawgit.com/libp2p/community/master/img/contribute.gif)](https://github.com/libp2p/community/blob/master/CONTRIBUTE.md)
## License
MIT
---
The last gx published version of this module was: 2.0.27: QmaSWc4ox6SZQF6DHZvDuM9sP1syNajkKuPXmKR1t5BAz5
<!-- reference links -->
[go-libp2p]: https://github.com/libp2p/go-libp2p
[concept-transport]: https://docs.libp2p.io/concepts/transport/
[interface-host]: https://github.com/libp2p/go-libp2p-core/blob/master/host/host.go
[godoc-libp2p-new]: https://godoc.org/github.com/libp2p/go-libp2p#New
[transport-upgrader]: https://github.com/libp2p/go-libp2p-transport-upgrader
[multiaddr]: https://github.com/multiformats/multiaddr
[spec-websockets]: https://tools.ietf.org/html/rfc6455
package websocket
import (
"fmt"
"net"
"net/url"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
// Addr is an implementation of net.Addr for WebSocket.
type Addr struct {
*url.URL
}
var _ net.Addr = (*Addr)(nil)
// Network returns the network type for a WebSocket, "websocket".
func (addr *Addr) Network() string {
return "websocket"
}
// NewAddr creates a new Addr using the given host string
func NewAddr(host string) *Addr {
return &Addr{
URL: &url.URL{
Host: host,
},
}
}
func ConvertWebsocketMultiaddrToNetAddr(maddr ma.Multiaddr) (net.Addr, error) {
_, host, err := manet.DialArgs(maddr)
if err != nil {
return nil, err
}
return NewAddr(host), nil
}
func ParseWebsocketNetAddr(a net.Addr) (ma.Multiaddr, error) {
wsa, ok := a.(*Addr)
if !ok {
return nil, fmt.Errorf("not a websocket address")
}
tcpaddr, err := net.ResolveTCPAddr("tcp", wsa.Host)
if err != nil {
return nil, err
}
tcpma, err := manet.FromNetAddr(tcpaddr)
if err != nil {
return nil, err
}
wsma, err := ma.NewMultiaddr("/ws")
if err != nil {
return nil, err
}
return tcpma.Encapsulate(wsma), nil
}
func parseMultiaddr(a ma.Multiaddr) (string, error) {
_, host, err := manet.DialArgs(a)
if err != nil {
return "", err
}
return "ws://" + host, nil
}
package websocket
import (
"net/url"
"testing"
ma "github.com/multiformats/go-multiaddr"
)
func TestMultiaddrParsing(t *testing.T) {
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5555/ws")
if err != nil {
t.Fatal(err)
}
wsaddr, err := parseMultiaddr(addr)
if err != nil {
t.Fatal(err)
}
if wsaddr != "ws://127.0.0.1:5555" {
t.Fatalf("expected ws://127.0.0.1:5555, got %s", wsaddr)
}
}
type httpAddr struct {
*url.URL
}
func (addr *httpAddr) Network() string {
return "http"
}
func TestParseWebsocketNetAddr(t *testing.T) {
notWs := &httpAddr{&url.URL{Host: "http://127.0.0.1:1234"}}
_, err := ParseWebsocketNetAddr(notWs)
if err.Error() != "not a websocket address" {
t.Fatalf("expect \"not a websocket address\", got \"%s\"", err)
}
wsAddr := NewAddr("127.0.0.1:5555")
parsed, err := ParseWebsocketNetAddr(wsAddr)
if err != nil {
t.Fatal(err)
}
if parsed.String() != "/ip4/127.0.0.1/tcp/5555/ws" {
t.Fatalf("expected \"/ip4/127.0.0.1/tcp/5555/ws\", got \"%s\"", parsed.String())
}
}
func TestConvertWebsocketMultiaddrToNetAddr(t *testing.T) {
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5555/ws")
if err != nil {
t.Fatal(err)
}
wsaddr, err := ConvertWebsocketMultiaddrToNetAddr(addr)
if err != nil {
t.Fatal(err)
}
if wsaddr.String() != "//127.0.0.1:5555" {
t.Fatalf("expected //127.0.0.1:5555, got %s", wsaddr)
}
if wsaddr.Network() != "websocket" {
t.Fatalf("expected network: \"websocket\", got \"%s\"", wsaddr.Network())
}
}
// +build js,wasm
package websocket
import (
"bufio"
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/sec/insecure"
mplex "github.com/libp2p/go-libp2p-mplex"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
ma "github.com/multiformats/go-multiaddr"
)
func TestInBrowser(t *testing.T) {
tpt := New(&tptu.Upgrader{
Secure: insecure.New("browserPeer"),
Muxer: new(mplex.Transport),
})
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5555/ws")
if err != nil {
t.Fatal("could not parse multiaddress:" + err.Error())
}
conn, err := tpt.Dial(context.Background(), addr, "serverPeer")
if err != nil {
t.Fatal("could not dial server:" + err.Error())
}
defer conn.Close()
stream, err := conn.AcceptStream()
if err != nil {
t.Fatal("could not accept stream:" + err.Error())
}
defer stream.Close()
buf := bufio.NewReader(stream)
msg, err := buf.ReadString('\n')
if err != nil {
t.Fatal("could not read ping message:" + err.Error())
}
expected := "ping\n"
if msg != expected {
t.Fatalf("Received wrong message. Expected %q but got %q", expected, msg)
}
_, err = stream.Write([]byte("pong\n"))
if err != nil {
t.Fatal("could not write pong message:" + err.Error())
}
// TODO(albrow): This hack is necessary in order to give the reader time to
// finish. As soon as this test function returns, the browser window is
// closed, which means there is no time for the other end of the connection to
// read the "pong" message. We should find some way to remove this hack if
// possible.
time.Sleep(1 * time.Second)
}
// +build !js
package websocket
import (
"bufio"
"context"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"github.com/libp2p/go-libp2p-core/sec/insecure"
mplex "github.com/libp2p/go-libp2p-mplex"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
ma "github.com/multiformats/go-multiaddr"
)
var (
wasmBrowserTestBin = "wasmbrowsertest"
wasmBrowserTestDir = filepath.Join("tools", "bin")
wasmBrowserTestPackage = "github.com/agnivade/wasmbrowsertest"
)
// TestInBrowser is a harness that allows us to use `go test` in order to run
// WebAssembly tests in a headless browser.
func TestInBrowser(t *testing.T) {
// ensure we have the right tools.
err := os.MkdirAll(wasmBrowserTestDir, 0755)
t.Logf("building %s", wasmBrowserTestPackage)
if err != nil && !os.IsExist(err) {
t.Fatal(err)
}
cmd := exec.Command(
"go", "build",
"-o", wasmBrowserTestBin,
"github.com/agnivade/wasmbrowsertest",
)
cmd.Dir = wasmBrowserTestDir
err = cmd.Run()
if err != nil {
t.Fatal(err)
}
t.Log("starting server")
// Start a transport which the browser peer will dial.
serverDoneSignal := make(chan struct{})
go func() {
defer func() {
close(serverDoneSignal)
}()
tpt := New(&tptu.Upgrader{
Secure: insecure.New("serverPeer"),
Muxer: new(mplex.Transport),
})
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5555/ws")
if err != nil {
t.Fatal("SERVER:", err)
}
listener, err := tpt.Listen(addr)
if err != nil {
t.Fatal("SERVER:", err)
}
conn, err := listener.Accept()
if err != nil {
t.Fatal("SERVER:", err)
}
defer conn.Close()
stream, err := conn.OpenStream(context.Background())
if err != nil {
t.Fatal("SERVER: could not open stream:", err)
}
defer stream.Close()
buf := bufio.NewReader(stream)
if _, err := stream.Write([]byte("ping\n")); err != nil {
t.Fatal("SERVER:", err)
}
msg, err := buf.ReadString('\n')
if err != nil {
t.Fatal("SERVER: could not read pong message:" + err.Error())
}
expected := "pong\n"
if msg != expected {
t.Fatalf("SERVER: Received wrong message. Expected %q but got %q", expected, msg)
}
}()
t.Log("starting browser")
cmd = exec.Command(
"go", "test", "-v",
"-exec", filepath.Join(wasmBrowserTestDir, wasmBrowserTestBin),
"-run", "TestInBrowser",
".",
)
cmd.Env = append(os.Environ(), []string{"GOOS=js", "GOARCH=wasm"}...)
output, err := cmd.CombinedOutput()
if err != nil {
formattedOutput := "\t" + strings.Join(strings.Split(string(output), "\n"), "\n\t")
t.Log("BROWSER OUTPUT:\n", formattedOutput)
t.Fatal("BROWSER:", err)
}
<-serverDoneSignal
}
coverage:
range: "50...100"
comment: off
ignore:
- "*_browser.go" # Can't get coverage reports.
package websocket
import (
"net"
"time"
)
// GracefulCloseTimeout is the time to wait trying to gracefully close a
// connection before simply cutting it.
var GracefulCloseTimeout = 100 * time.Millisecond
var _ net.Conn = (*Conn)(nil)
// +build js,wasm
package websocket
import (
"bytes"
"errors"
"fmt"
"net"
"strings"
"sync"
"syscall/js"
"time"
)
const (
webSocketStateConnecting = 0
webSocketStateOpen = 1
webSocketStateClosing = 2
webSocketStateClosed = 3
)
var errConnectionClosed = errors.New("connection is closed")
// Conn implements net.Conn interface for WebSockets in js/wasm.
type Conn struct {
js.Value
messageHandler *js.Func
closeHandler *js.Func
errorHandler *js.Func
mut sync.Mutex
currDataMut sync.RWMutex
currData bytes.Buffer
closeOnce sync.Once
closeSignalOnce sync.Once
closeSignal chan struct{}
dataSignal chan struct{}
localAddr net.Addr
remoteAddr net.Addr
firstErr error // only read this _after_ observing that closeSignal has been closed.
}
// NewConn creates a Conn given a regular js/wasm WebSocket Conn.
func NewConn(raw js.Value) *Conn {
conn := &Conn{
Value: raw,
closeSignal: make(chan struct{}),
dataSignal: make(chan struct{}, 1),
localAddr: NewAddr("0.0.0.0:0"),
remoteAddr: getRemoteAddr(raw),
}
// Force the JavaScript WebSockets API to use the ArrayBuffer type for
// incoming messages instead of the Blob type. This is better for us because
// ArrayBuffer can be converted to []byte synchronously but Blob cannot.
conn.Set("binaryType", "arraybuffer")
conn.setUpHandlers()
return conn
}
func (c *Conn) Read(b []byte) (int, error) {
select {
case <-c.closeSignal:
c.readAfterErr(b)
default:
}
for {
c.currDataMut.RLock()
n, _ := c.currData.Read(b)
c.currDataMut.RUnlock()
if n != 0 {
// Data was ready. Return the number of bytes read.
return n, nil
}
// There is no data ready to be read. Wait for more data or for the
// connection to be closed.
select {
case <-c.dataSignal:
case <-c.closeSignal:
return c.readAfterErr(b)
}
}
}
// readAfterError reads from c.currData. If there is no more data left it
// returns c.firstErr if non-nil and otherwise returns io.EOF.
func (c *Conn) readAfterErr(b []byte) (int, error) {
if c.firstErr != nil {
return 0, c.firstErr
}
c.currDataMut.RLock()
n, err := c.currData.Read(b)
c.currDataMut.RUnlock()
return n, err
}
// checkOpen returns an error if the connection is not open. Otherwise, it
// returns nil.
func (c *Conn) checkOpen() error {
state := c.Get("readyState").Int()
switch state {
case webSocketStateClosed, webSocketStateClosing:
return errConnectionClosed
}
return nil
}
func (c *Conn) Write(b []byte) (n int, err error) {
defer func() {
if e := recover(); e != nil {
err = recoveredValueToError(e)
}
}()
if err := c.checkOpen(); err != nil {
return 0, err
}
uint8Array := js.Global().Get("Uint8Array").New(len(b))
if js.CopyBytesToJS(uint8Array, b) != len(b) {
panic("expected to copy all bytes")
}
c.Call("send", uint8Array.Get("buffer"))
return len(b), nil
}
// Close closes the connection. Only the first call to Close will receive the
// close error, subsequent and concurrent calls will return nil.
// This method is thread-safe.
func (c *Conn) Close() error {
c.closeOnce.Do(func() {
c.Call("close")
c.signalClose(nil)
c.releaseHandlers()
})
return nil
}
func (c *Conn) signalClose(err error) {
c.closeSignalOnce.Do(func() {
c.firstErr = err
close(c.closeSignal)
})
}
func (c *Conn) releaseHandlers() {
c.mut.Lock()
defer c.mut.Unlock()
if c.messageHandler != nil {
c.Call("removeEventListener", "message", *c.messageHandler)
c.messageHandler.Release()
c.messageHandler = nil
}
if c.closeHandler != nil {
c.Call("removeEventListener", "close", *c.closeHandler)
c.closeHandler.Release()
c.closeHandler = nil
}
if c.errorHandler != nil {
c.Call("removeEventListener", "error", *c.errorHandler)
c.errorHandler.Release()
c.errorHandler = nil
}
}
func (c *Conn) LocalAddr() net.Addr {
return c.localAddr
}
func getRemoteAddr(val js.Value) net.Addr {
rawURL := val.Get("url").String()
withoutPrefix := strings.TrimPrefix(rawURL, "ws://")
withoutSuffix := strings.TrimSuffix(withoutPrefix, "/")
return NewAddr(withoutSuffix)
}
func (c *Conn) RemoteAddr() net.Addr {
return c.remoteAddr
}
// TODO: Return os.ErrNoDeadline. For now we return nil because multiplexers
// don't handle the error correctly.
func (c *Conn) SetDeadline(t time.Time) error {
return nil
}
func (c *Conn) SetReadDeadline(t time.Time) error {
return nil
}
func (c *Conn) SetWriteDeadline(t time.Time) error {
return nil
}
func (c *Conn) setUpHandlers() {
c.mut.Lock()
defer c.mut.Unlock()
if c.messageHandler != nil {
// Message handlers already created. Nothing to do.
return
}
messageHandler := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
arrayBuffer := args[0].Get("data")
data := arrayBufferToBytes(arrayBuffer)
c.currDataMut.Lock()
if _, err := c.currData.Write(data); err != nil {
c.currDataMut.Unlock()
return err
}
c.currDataMut.Unlock()
// Non-blocking signal
select {
case c.dataSignal <- struct{}{}:
default:
}
return nil
})
c.messageHandler = &messageHandler
c.Call("addEventListener", "message", messageHandler)
closeHandler := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
go func() {
c.signalClose(errorEventToError(args[0]))
c.releaseHandlers()
}()
return nil
})
c.closeHandler = &closeHandler
c.Call("addEventListener", "close", closeHandler)
errorHandler := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
// Unfortunately, the "error" event doesn't appear to give us any useful
// information. All we can do is close the connection.
c.Close()
return nil
})
c.errorHandler = &errorHandler
c.Call("addEventListener", "error", errorHandler)
}
func (c *Conn) waitForOpen() error {
openSignal := make(chan struct{})
handler := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
close(openSignal)
return nil
})
defer c.Call("removeEventListener", "open", handler)
defer handler.Release()
c.Call("addEventListener", "open", handler)
select {
case <-openSignal:
return nil
case <-c.closeSignal:
// c.closeSignal means there was an error when trying to open the
// connection.
return c.firstErr
}
}
// arrayBufferToBytes converts a JavaScript ArrayBuffer to a slice of bytes.
func arrayBufferToBytes(buffer js.Value) []byte {
view := js.Global().Get("Uint8Array").New(buffer)
dataLen := view.Length()
data := make([]byte, dataLen)
if js.CopyBytesToGo(data, view) != dataLen {
panic("expected to copy all bytes")
}
return data
}
func errorEventToError(val js.Value) error {
var typ string
if gotType := val.Get("type"); !gotType.Equal(js.Undefined()) {
typ = gotType.String()
} else {
typ = val.Type().String()
}
var reason string
if gotReason := val.Get("reason"); !gotReason.Equal(js.Undefined()) && gotReason.String() != "" {
reason = gotReason.String()
} else {
code := val.Get("code")
if !code.Equal(js.Undefined()) {
switch code := code.Int(); code {
case 1006:
reason = "code 1006: connection unexpectedly closed"
default:
reason = fmt.Sprintf("unexpected code: %d", code)
}
}
}
return fmt.Errorf("JavaScript error: (%s) %s", typ, reason)
}
func recoveredValueToError(e interface{}) error {
switch e := e.(type) {
case error:
return e
default:
return fmt.Errorf("recovered from unexpected panic: %T %s", e, e)
}
}
// +build !js
package websocket
import (
"io"
"net"
"sync"
"time"
ws "github.com/gorilla/websocket"
)
// Conn implements net.Conn interface for gorilla/websocket.
type Conn struct {
*ws.Conn
DefaultMessageType int
reader io.Reader
closeOnce sync.Once
readLock, writeLock sync.Mutex
}
func (c *Conn) Read(b []byte) (int, error) {
c.readLock.Lock()
defer c.readLock.Unlock()
if c.reader == nil {
if err := c.prepNextReader(); err != nil {
return 0, err
}
}
for {
n, err := c.reader.Read(b)
switch err {
case io.EOF:
c.reader = nil
if n > 0 {
return n, nil
}
if err := c.prepNextReader(); err != nil {
return 0, err
}
// explicitly looping
default:
return n, err
}
}
}
func (c *Conn) prepNextReader() error {
t, r, err := c.Conn.NextReader()
if err != nil {
if wserr, ok := err.(*ws.CloseError); ok {
if wserr.Code == 1000 || wserr.Code == 1005 {
return io.EOF
}
}
return err
}
if t == ws.CloseMessage {
return io.EOF
}
c.reader = r
return nil
}
func (c *Conn) Write(b []byte) (n int, err error) {
c.writeLock.Lock()
defer c.writeLock.Unlock()
if err := c.Conn.WriteMessage(c.DefaultMessageType, b); err != nil {
return 0, err
}
return len(b), nil
}
// Close closes the connection. Only the first call to Close will receive the
// close error, subsequent and concurrent calls will return nil.
// This method is thread-safe.
func (c *Conn) Close() error {
var err error
c.closeOnce.Do(func() {
err1 := c.Conn.WriteControl(
ws.CloseMessage,
ws.FormatCloseMessage(ws.CloseNormalClosure, "closed"),
time.Now().Add(GracefulCloseTimeout),
)
err2 := c.Conn.Close()
switch {
case err1 != nil:
err = err1
case err2 != nil:
err = err2
}
})
return err
}
func (c *Conn) LocalAddr() net.Addr {
return NewAddr(c.Conn.LocalAddr().String())
}
func (c *Conn) RemoteAddr() net.Addr {
return NewAddr(c.Conn.RemoteAddr().String())
}
func (c *Conn) SetDeadline(t time.Time) error {
if err := c.SetReadDeadline(t); err != nil {
return err
}
return c.SetWriteDeadline(t)
}
func (c *Conn) SetReadDeadline(t time.Time) error {
// Don't lock when setting the read deadline. That would prevent us from
// interrupting an in-progress read.
return c.Conn.SetReadDeadline(t)
}
func (c *Conn) SetWriteDeadline(t time.Time) error {
// Unlike the read deadline, we need to lock when setting the write
// deadline.
c.writeLock.Lock()
defer c.writeLock.Unlock()
return c.Conn.SetWriteDeadline(t)
}
// NewConn creates a Conn given a regular gorilla/websocket Conn.
func NewConn(raw *ws.Conn) *Conn {
return &Conn{
Conn: raw,
DefaultMessageType: ws.BinaryMessage,
}
}
This diff is collapsed.
// +build !js
package websocket
import (
"fmt"
"net"
"net/http"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
type listener struct {
net.Listener
laddr ma.Multiaddr
closed chan struct{}
incoming chan *Conn
}
func (l *listener) serve() {
defer close(l.closed)
_ = http.Serve(l.Listener, l)
}
func (l *listener) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
// The upgrader writes a response for us.
return
}
select {
case l.incoming <- NewConn(c):
case <-l.closed:
c.Close()
}
// The connection has been hijacked, it's safe to return.
}
func (l *listener) Accept() (manet.Conn, error) {
select {
case c, ok := <-l.incoming:
if !ok {
return nil, fmt.Errorf("listener is closed")
}
mnc, err := manet.WrapNetConn(c)
if err != nil {
c.Close()
return nil, err
}
return mnc, nil
case <-l.closed:
return nil, fmt.Errorf("listener is closed")
}
}
func (l *listener) Multiaddr() ma.Multiaddr {
return l.laddr
}
module github.com/libp2p/go-ws-transport/tools
go 1.13
require github.com/agnivade/wasmbrowsertest v0.3.1
github.com/agnivade/wasmbrowsertest v0.3.1 h1:bA9aA+bcp7KuqGvmCuBdnMqy6PXxFjYP7FxsaT+JSqc=
github.com/agnivade/wasmbrowsertest v0.3.1/go.mod h1:zQt6ZTdl338xxRaMW395qccVE2eQm0SjC/SDz0mPWQI=
github.com/chromedp/cdproto v0.0.0-20190614062957-d6d2f92b486d/go.mod h1:S8mB5wY3vV+vRIzf39xDXsw3XKYewW9X6rW2aEmkrSw=
github.com/chromedp/cdproto v0.0.0-20190621002710-8cbd498dd7a0 h1:4Wocv9f+KWF4GtZudyrn8JSBTgHQbGp86mcsoH7j1iQ=
github.com/chromedp/cdproto v0.0.0-20190621002710-8cbd498dd7a0/go.mod h1:S8mB5wY3vV+vRIzf39xDXsw3XKYewW9X6rW2aEmkrSw=
github.com/chromedp/chromedp v0.3.1-0.20190619195644-fd957a4d2901 h1:tg66ykM8VYqP9k4DFQwSMnYv84HNTruF+GR6kefFNg4=
github.com/chromedp/chromedp v0.3.1-0.20190619195644-fd957a4d2901/go.mod h1:mJdvfrVn594N9tfiPecUidF6W5jPRKHymqHfzbobPsM=
github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/go-interpreter/wagon v0.5.1-0.20190713202023-55a163980b6c h1:DLLAPVFrk9iNzljMKF512CUmrFImQ6WU3sDiUS4IRqk=
github.com/go-interpreter/wagon v0.5.1-0.20190713202023-55a163980b6c/go.mod h1:5+b/MBYkclRZngKF5s6qrgWxSLgE9F5dFdO1hAueZLc=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f h1:Jnx61latede7zDD3DiiP4gmNz33uK0U5HDUaF0a/HVQ=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/knq/sysutil v0.0.0-20181215143952-f05b59f0f307 h1:vl4eIlySbjertFaNwiMjXsGrFVK25aOWLq7n+3gh2ls=
github.com/knq/sysutil v0.0.0-20181215143952-f05b59f0f307/go.mod h1:BjPj+aVjl9FW/cCGiF3nGh5v+9Gd3VCgBQbod/GlMaQ=
github.com/mailru/easyjson v0.0.0-20190403194419-1ea4449da983/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190620125010-da37f6c1e481 h1:IaSjLMT6WvkoZZjspGxy3rdaTEmWLoRm49WbtVUi9sA=
github.com/mailru/easyjson v0.0.0-20190620125010-da37f6c1e481/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/twitchyliquid64/golang-asm v0.0.0-20190126203739-365674df15fc h1:RTUQlKzoZZVG3umWNzOYeFecQLIh+dbxXvJp1zPQJTI=
github.com/twitchyliquid64/golang-asm v0.0.0-20190126203739-365674df15fc/go.mod h1:NoCfSFWosfqMqmmD7hApkirIK9ozpHjxRnRxs1l413A=
golang.org/x/sys v0.0.0-20190306220234-b354f8bf4d9e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190618155005-516e3c20635f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
// +build tools
package tools
import _ "github.com/agnivade/wasmbrowsertest"
// Package websocket implements a websocket based transport for go-libp2p.
package websocket
import (
"context"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
manet "github.com/multiformats/go-multiaddr/net"
)
// WsProtocol is the multiaddr protocol definition for this transport.
//
// Deprecated: use `ma.ProtocolWithCode(ma.P_WS)
var WsProtocol = ma.ProtocolWithCode(ma.P_WS)
// WsFmt is multiaddr formatter for WsProtocol
var WsFmt = mafmt.And(mafmt.TCP, mafmt.Base(ma.P_WS))
// WsCodec is the multiaddr-net codec definition for the websocket transport
var WsCodec = &manet.NetCodec{
NetAddrNetworks: []string{"websocket"},
ProtocolName: "ws",
ConvertMultiaddr: ConvertWebsocketMultiaddrToNetAddr,
ParseNetAddr: ParseWebsocketNetAddr,
}
// This is _not_ WsFmt because we want the transport to stick to dialing fully
// resolved addresses.
var dialMatcher = mafmt.And(mafmt.IP, mafmt.Base(ma.P_TCP), mafmt.Base(ma.P_WS))
func init() {
manet.RegisterNetCodec(WsCodec)
}
var _ transport.Transport = (*WebsocketTransport)(nil)
// WebsocketTransport is the actual go-libp2p transport
type WebsocketTransport struct {
Upgrader *tptu.Upgrader
}
func New(u *tptu.Upgrader) *WebsocketTransport {
return &WebsocketTransport{u}
}
func (t *WebsocketTransport) CanDial(a ma.Multiaddr) bool {
return dialMatcher.Matches(a)
}
func (t *WebsocketTransport) Protocols() []int {
return []int{WsProtocol.Code}
}
func (t *WebsocketTransport) Proxy() bool {
return false
}
func (t *WebsocketTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
macon, err := t.maDial(ctx, raddr)
if err != nil {
return nil, err
}
return t.Upgrader.UpgradeOutbound(ctx, t, macon, p)
}
// +build js,wasm
package websocket
import (
"context"
"errors"
"syscall/js"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
wsurl, err := parseMultiaddr(raddr)
if err != nil {
return nil, err
}
rawConn := js.Global().Get("WebSocket").New(wsurl)
conn := NewConn(rawConn)
if err := conn.waitForOpen(); err != nil {
conn.Close()
return nil, err
}
mnc, err := manet.WrapNetConn(conn)
if err != nil {
conn.Close()
return nil, err
}
return mnc, nil
}
func (t *WebsocketTransport) Listen(a ma.Multiaddr) (transport.Listener, error) {
return nil, errors.New("Listen not implemented on js/wasm")
}
// +build !js
package websocket
import (
"context"
"net"
"net/http"
"net/url"
ws "github.com/gorilla/websocket"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
// Default gorilla upgrader
var upgrader = ws.Upgrader{
// Allow requests from *all* origins.
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
wsurl, err := parseMultiaddr(raddr)
if err != nil {
return nil, err
}
wscon, _, err := ws.DefaultDialer.Dial(wsurl, nil)
if err != nil {
return nil, err
}
mnc, err := manet.WrapNetConn(NewConn(wscon))
if err != nil {
wscon.Close()
return nil, err
}
return mnc, nil
}
func (t *WebsocketTransport) maListen(a ma.Multiaddr) (manet.Listener, error) {
lnet, lnaddr, err := manet.DialArgs(a)
if err != nil {
return nil, err
}
nl, err := net.Listen(lnet, lnaddr)
if err != nil {
return nil, err
}
u, err := url.Parse("http://" + nl.Addr().String())
if err != nil {
nl.Close()
return nil, err
}
malist, err := t.wrapListener(nl, u)
if err != nil {
nl.Close()
return nil, err
}
go malist.serve()
return malist, nil
}
func (t *WebsocketTransport) Listen(a ma.Multiaddr) (transport.Listener, error) {
malist, err := t.maListen(a)
if err != nil {
return nil, err
}
return t.Upgrader.UpgradeListener(t, malist), nil
}
func (t *WebsocketTransport) wrapListener(l net.Listener, origin *url.URL) (*listener, error) {
laddr, err := manet.FromNetAddr(l.Addr())
if err != nil {
return nil, err
}
wsma, err := ma.NewMultiaddr("/ws")
if err != nil {
return nil, err
}
laddr = laddr.Encapsulate(wsma)
return &listener{
laddr: laddr,
Listener: l,
incoming: make(chan *Conn),
closed: make(chan struct{}),
}, nil
}
// +build !js
package websocket
import (
"bytes"
"context"
"io"
"io/ioutil"
"testing"
"testing/iotest"
"github.com/libp2p/go-libp2p-core/sec/insecure"
mplex "github.com/libp2p/go-libp2p-mplex"
ttransport "github.com/libp2p/go-libp2p-testing/suites/transport"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
ma "github.com/multiformats/go-multiaddr"
)
func TestCanDial(t *testing.T) {
addrWs, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5555/ws")
if err != nil {
t.Fatal(err)
}
addrTCP, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5555")
if err != nil {
t.Fatal(err)
}
d := &WebsocketTransport{}
matchTrue := d.CanDial(addrWs)
matchFalse := d.CanDial(addrTCP)
if !matchTrue {
t.Fatal("expected to match websocket maddr, but did not")
}
if matchFalse {
t.Fatal("expected to not match tcp maddr, but did")
}
}
func TestWebsocketTransport(t *testing.T) {
t.Skip("This test is failing, see https://github.com/libp2p/go-ws-transport/issues/99")
ta := New(&tptu.Upgrader{
Secure: insecure.New("peerA"),
Muxer: new(mplex.Transport),
})
tb := New(&tptu.Upgrader{
Secure: insecure.New("peerB"),
Muxer: new(mplex.Transport),
})
zero := "/ip4/127.0.0.1/tcp/0/ws"
ttransport.SubtestTransport(t, ta, tb, zero, "peerA")
}
func TestWebsocketListen(t *testing.T) {
zero, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0/ws")
if err != nil {
t.Fatal(err)
}
tpt := &WebsocketTransport{}
l, err := tpt.maListen(zero)
if err != nil {
t.Fatal(err)
}
defer l.Close()
msg := []byte("HELLO WORLD")
go func() {
c, err := tpt.maDial(context.Background(), l.Multiaddr())
if err != nil {
t.Error(err)
return
}
_, err = c.Write(msg)
if err != nil {
t.Error(err)
}
err = c.Close()
if err != nil {
t.Error(err)
}
}()
c, err := l.Accept()
if err != nil {
t.Fatal(err)
}
defer c.Close()
obr := iotest.OneByteReader(c)
out, err := ioutil.ReadAll(obr)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(out, msg) {
t.Fatal("got wrong message", out, msg)
}
}
func TestConcurrentClose(t *testing.T) {
zero, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0/ws")
if err != nil {
t.Fatal(err)
}
tpt := &WebsocketTransport{}
l, err := tpt.maListen(zero)
if err != nil {
t.Fatal(err)
}
defer l.Close()
msg := []byte("HELLO WORLD")
go func() {
for i := 0; i < 100; i++ {
c, err := tpt.maDial(context.Background(), l.Multiaddr())
if err != nil {
t.Error(err)
return
}
go func() {
_, _ = c.Write(msg)
}()
go func() {
_ = c.Close()
}()
}
}()
for i := 0; i < 100; i++ {
c, err := l.Accept()
if err != nil {
t.Fatal(err)
}
c.Close()
}
}
func TestWriteZero(t *testing.T) {
zero, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0/ws")
if err != nil {
t.Fatal(err)
}
tpt := &WebsocketTransport{}
l, err := tpt.maListen(zero)
if err != nil {
t.Fatal(err)
}
defer l.Close()
msg := []byte(nil)
go func() {
c, err := tpt.maDial(context.Background(), l.Multiaddr())
defer c.Close()
if err != nil {
t.Error(err)
return
}
for i := 0; i < 100; i++ {
n, err := c.Write(msg)
if n != 0 {
t.Errorf("expected to write 0 bytes, wrote %d", n)
}
if err != nil {
t.Error(err)
return
}
}
}()
c, err := l.Accept()
defer c.Close()
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 100)
n, err := c.Read(buf)
if n != 0 {
t.Errorf("read %d bytes, expected 0", n)
}
if err != io.EOF {
t.Errorf("expected EOF, got err: %s", err)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment