Unverified Commit ca6f70cc authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #22 from libp2p/feat/consolidate-abstractions

Consolidate abstractions and core types into go-libp2p-core (#28)
parents dba55de7 6e278912
...@@ -3,15 +3,15 @@ package stream ...@@ -3,15 +3,15 @@ package stream
import ( import (
"fmt" "fmt"
inet "github.com/libp2p/go-libp2p-net" mux "github.com/libp2p/go-libp2p-core/mux"
transport "github.com/libp2p/go-libp2p-transport" network "github.com/libp2p/go-libp2p-core/network"
smux "github.com/libp2p/go-stream-muxer" transport "github.com/libp2p/go-libp2p-core/transport"
) )
type transportConn struct { type transportConn struct {
smux.Conn mux.MuxedConn
inet.ConnMultiaddrs network.ConnMultiaddrs
inet.ConnSecurity network.ConnSecurity
transport transport.Transport transport transport.Transport
} }
......
This diff is collapsed.
...@@ -7,24 +7,19 @@ import ( ...@@ -7,24 +7,19 @@ import (
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
tec "github.com/jbenet/go-temp-err-catcher" tec "github.com/jbenet/go-temp-err-catcher"
transport "github.com/libp2p/go-libp2p-transport" transport "github.com/libp2p/go-libp2p-core/transport"
manet "github.com/multiformats/go-multiaddr-net" manet "github.com/multiformats/go-multiaddr-net"
) )
var log = logging.Logger("stream-upgrader") var log = logging.Logger("stream-upgrader")
type connErr struct {
conn transport.Conn
err error
}
type listener struct { type listener struct {
manet.Listener manet.Listener
transport transport.Transport transport transport.Transport
upgrader *Upgrader upgrader *Upgrader
incoming chan transport.Conn incoming chan transport.CapableConn
err error err error
// Used for backpressure // Used for backpressure
...@@ -139,7 +134,7 @@ func (l *listener) handleIncoming() { ...@@ -139,7 +134,7 @@ func (l *listener) handleIncoming() {
} }
// Accept accepts a connection. // Accept accepts a connection.
func (l *listener) Accept() (transport.Conn, error) { func (l *listener) Accept() (transport.CapableConn, error) {
for c := range l.incoming { for c := range l.incoming {
// Could have been sitting there for a while. // Could have been sitting there for a while.
if !c.IsClosed() { if !c.IsClosed() {
......
...@@ -7,14 +7,14 @@ import ( ...@@ -7,14 +7,14 @@ import (
"sync" "sync"
"time" "time"
insecure "github.com/libp2p/go-conn-security/insecure" core "github.com/libp2p/go-libp2p-core"
peer "github.com/libp2p/go-libp2p-peer" mux "github.com/libp2p/go-libp2p-core/mux"
tpt "github.com/libp2p/go-libp2p-transport" insecure "github.com/libp2p/go-libp2p-core/sec/insecure"
tpt "github.com/libp2p/go-libp2p-core/transport"
mplex "github.com/libp2p/go-libp2p-mplex"
st "github.com/libp2p/go-libp2p-transport-upgrader" st "github.com/libp2p/go-libp2p-transport-upgrader"
smux "github.com/libp2p/go-stream-muxer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net" manet "github.com/multiformats/go-multiaddr-net"
mplex "github.com/libp2p/go-libp2p-mplex"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
...@@ -24,7 +24,7 @@ import ( ...@@ -24,7 +24,7 @@ import (
// It makes sure that this happens at the same time for client and server. // It makes sure that this happens at the same time for client and server.
type negotiatingMuxer struct{} type negotiatingMuxer struct{}
func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool) (smux.Conn, error) { func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
var err error var err error
// run a fake muxer negotiation // run a fake muxer negotiation
if isServer { if isServer {
...@@ -43,10 +43,10 @@ type blockingMuxer struct { ...@@ -43,10 +43,10 @@ type blockingMuxer struct {
unblock chan struct{} unblock chan struct{}
} }
var _ smux.Transport = &blockingMuxer{} var _ mux.Multiplexer = &blockingMuxer{}
func newBlockingMuxer() *blockingMuxer { return &blockingMuxer{unblock: make(chan struct{})} } func newBlockingMuxer() *blockingMuxer { return &blockingMuxer{unblock: make(chan struct{})} }
func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (smux.Conn, error) { func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
<-m.unblock <-m.unblock
return (&negotiatingMuxer{}).NewConn(c, isServer) return (&negotiatingMuxer{}).NewConn(c, isServer)
} }
...@@ -55,21 +55,21 @@ func (m *blockingMuxer) Unblock() { close(m.unblock) } ...@@ -55,21 +55,21 @@ func (m *blockingMuxer) Unblock() { close(m.unblock) }
// errorMuxer is a muxer that errors while setting up // errorMuxer is a muxer that errors while setting up
type errorMuxer struct{} type errorMuxer struct{}
var _ smux.Transport = &errorMuxer{} var _ mux.Multiplexer = &errorMuxer{}
func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (smux.Conn, error) { func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
return nil, errors.New("mux error") return nil, errors.New("mux error")
} }
var _ = Describe("Listener", func() { var _ = Describe("Listener", func() {
var ( var (
defaultUpgrader = &st.Upgrader{ defaultUpgrader = &st.Upgrader{
Secure: insecure.New(peer.ID(1)), Secure: insecure.New(core.PeerID(1)),
Muxer: &negotiatingMuxer{}, Muxer: &negotiatingMuxer{},
} }
) )
testConn := func(clientConn, serverConn tpt.Conn) { testConn := func(clientConn, serverConn tpt.CapableConn) {
cstr, err := clientConn.OpenStream() cstr, err := clientConn.OpenStream()
ExpectWithOffset(0, err).ToNot(HaveOccurred()) ExpectWithOffset(0, err).ToNot(HaveOccurred())
_, err = cstr.Write([]byte("foobar")) _, err = cstr.Write([]byte("foobar"))
...@@ -90,7 +90,7 @@ var _ = Describe("Listener", func() { ...@@ -90,7 +90,7 @@ var _ = Describe("Listener", func() {
return upgrader.UpgradeListener(nil, ln) return upgrader.UpgradeListener(nil, ln)
} }
dial := func(upgrader *st.Upgrader, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) { dial := func(upgrader *st.Upgrader, raddr ma.Multiaddr, p core.PeerID) (tpt.CapableConn, error) {
macon, err := manet.Dial(raddr) macon, err := manet.Dial(raddr)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -105,7 +105,7 @@ var _ = Describe("Listener", func() { ...@@ -105,7 +105,7 @@ var _ = Describe("Listener", func() {
It("accepts a single connection", func() { It("accepts a single connection", func() {
ln := createListener(defaultUpgrader) ln := createListener(defaultUpgrader)
defer ln.Close() defer ln.Close()
cconn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(1)) cconn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(1))
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
sconn, err := ln.Accept() sconn, err := ln.Accept()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
...@@ -117,7 +117,7 @@ var _ = Describe("Listener", func() { ...@@ -117,7 +117,7 @@ var _ = Describe("Listener", func() {
defer ln.Close() defer ln.Close()
const num = 10 const num = 10
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
cconn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(1)) cconn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(1))
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
sconn, err := ln.Accept() sconn, err := ln.Accept()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
...@@ -130,7 +130,7 @@ var _ = Describe("Listener", func() { ...@@ -130,7 +130,7 @@ var _ = Describe("Listener", func() {
tpt.AcceptTimeout = timeout tpt.AcceptTimeout = timeout
ln := createListener(defaultUpgrader) ln := createListener(defaultUpgrader)
defer ln.Close() defer ln.Close()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2)) conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
if !Expect(err).ToNot(HaveOccurred()) { if !Expect(err).ToNot(HaveOccurred()) {
return return
} }
...@@ -150,7 +150,7 @@ var _ = Describe("Listener", func() { ...@@ -150,7 +150,7 @@ var _ = Describe("Listener", func() {
It("doesn't accept connections that fail to setup", func() { It("doesn't accept connections that fail to setup", func() {
upgrader := &st.Upgrader{ upgrader := &st.Upgrader{
Secure: insecure.New(peer.ID(1)), Secure: insecure.New(core.PeerID(1)),
Muxer: &errorMuxer{}, Muxer: &errorMuxer{},
} }
ln := createListener(upgrader) ln := createListener(upgrader)
...@@ -163,7 +163,7 @@ var _ = Describe("Listener", func() { ...@@ -163,7 +163,7 @@ var _ = Describe("Listener", func() {
} }
close(done) close(done)
}() }()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2)) conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
if !Expect(err).To(HaveOccurred()) { if !Expect(err).To(HaveOccurred()) {
conn.Close() conn.Close()
} }
...@@ -178,11 +178,11 @@ var _ = Describe("Listener", func() { ...@@ -178,11 +178,11 @@ var _ = Describe("Listener", func() {
num := 3 * st.AcceptQueueLength num := 3 * st.AcceptQueueLength
bm := newBlockingMuxer() bm := newBlockingMuxer()
upgrader := &st.Upgrader{ upgrader := &st.Upgrader{
Secure: insecure.New(peer.ID(1)), Secure: insecure.New(core.PeerID(1)),
Muxer: bm, Muxer: bm,
} }
ln := createListener(upgrader) ln := createListener(upgrader)
accepted := make(chan tpt.Conn, num) accepted := make(chan tpt.CapableConn, num)
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
for { for {
...@@ -200,7 +200,7 @@ var _ = Describe("Listener", func() { ...@@ -200,7 +200,7 @@ var _ = Describe("Listener", func() {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2)) conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
if Expect(err).ToNot(HaveOccurred()) { if Expect(err).ToNot(HaveOccurred()) {
stream, err := conn.AcceptStream() // wait for conn to be accepted. stream, err := conn.AcceptStream() // wait for conn to be accepted.
if !Expect(err).To(HaveOccurred()) { if !Expect(err).To(HaveOccurred()) {
...@@ -223,11 +223,11 @@ var _ = Describe("Listener", func() { ...@@ -223,11 +223,11 @@ var _ = Describe("Listener", func() {
defer ln.Close() defer ln.Close()
// setup AcceptQueueLength connections, but don't accept any of them // setup AcceptQueueLength connections, but don't accept any of them
dialed := make(chan tpt.Conn, 10*st.AcceptQueueLength) // used as a thread-safe counter dialed := make(chan tpt.CapableConn, 10*st.AcceptQueueLength) // used as a thread-safe counter
for i := 0; i < st.AcceptQueueLength; i++ { for i := 0; i < st.AcceptQueueLength; i++ {
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2)) conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
dialed <- conn dialed <- conn
}() }()
...@@ -236,7 +236,7 @@ var _ = Describe("Listener", func() { ...@@ -236,7 +236,7 @@ var _ = Describe("Listener", func() {
// dial a new connection. This connection should not complete setup, since the queue is full // dial a new connection. This connection should not complete setup, since the queue is full
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2)) conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
dialed <- conn dialed <- conn
}() }()
...@@ -279,7 +279,7 @@ var _ = Describe("Listener", func() { ...@@ -279,7 +279,7 @@ var _ = Describe("Listener", func() {
It("doesn't accept new connections when it is closed", func() { It("doesn't accept new connections when it is closed", func() {
ln := createListener(defaultUpgrader) ln := createListener(defaultUpgrader)
Expect(ln.Close()).To(Succeed()) Expect(ln.Close()).To(Succeed())
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(1)) conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(1))
if !Expect(err).To(HaveOccurred()) { if !Expect(err).To(HaveOccurred()) {
conn.Close() conn.Close()
} }
...@@ -287,7 +287,7 @@ var _ = Describe("Listener", func() { ...@@ -287,7 +287,7 @@ var _ = Describe("Listener", func() {
It("closes incoming connections that have not yet been accepted", func() { It("closes incoming connections that have not yet been accepted", func() {
ln := createListener(defaultUpgrader) ln := createListener(defaultUpgrader)
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2)) conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
if !Expect(err).ToNot(HaveOccurred()) { if !Expect(err).ToNot(HaveOccurred()) {
ln.Close() ln.Close()
return return
......
...@@ -6,12 +6,12 @@ import ( ...@@ -6,12 +6,12 @@ import (
"fmt" "fmt"
"net" "net"
ss "github.com/libp2p/go-conn-security" core "github.com/libp2p/go-libp2p-core"
pnet "github.com/libp2p/go-libp2p-interface-pnet" mux "github.com/libp2p/go-libp2p-core/mux"
peer "github.com/libp2p/go-libp2p-peer" pnet "github.com/libp2p/go-libp2p-core/pnet"
transport "github.com/libp2p/go-libp2p-transport" sec "github.com/libp2p/go-libp2p-core/sec"
transport "github.com/libp2p/go-libp2p-core/transport"
filter "github.com/libp2p/go-maddr-filter" filter "github.com/libp2p/go-maddr-filter"
smux "github.com/libp2p/go-stream-muxer"
manet "github.com/multiformats/go-multiaddr-net" manet "github.com/multiformats/go-multiaddr-net"
) )
...@@ -26,8 +26,8 @@ var AcceptQueueLength = 16 ...@@ -26,8 +26,8 @@ var AcceptQueueLength = 16
// to a full transport connection (secure and multiplexed). // to a full transport connection (secure and multiplexed).
type Upgrader struct { type Upgrader struct {
Protector pnet.Protector Protector pnet.Protector
Secure ss.Transport Secure sec.SecureTransport
Muxer smux.Transport Muxer mux.Multiplexer
Filters *filter.Filters Filters *filter.Filters
} }
...@@ -39,7 +39,7 @@ func (u *Upgrader) UpgradeListener(t transport.Transport, list manet.Listener) t ...@@ -39,7 +39,7 @@ func (u *Upgrader) UpgradeListener(t transport.Transport, list manet.Listener) t
upgrader: u, upgrader: u,
transport: t, transport: t,
threshold: newThreshold(AcceptQueueLength), threshold: newThreshold(AcceptQueueLength),
incoming: make(chan transport.Conn), incoming: make(chan transport.CapableConn),
cancel: cancel, cancel: cancel,
ctx: ctx, ctx: ctx,
} }
...@@ -49,7 +49,7 @@ func (u *Upgrader) UpgradeListener(t transport.Transport, list manet.Listener) t ...@@ -49,7 +49,7 @@ func (u *Upgrader) UpgradeListener(t transport.Transport, list manet.Listener) t
// UpgradeOutbound upgrades the given outbound multiaddr-net connection into a // UpgradeOutbound upgrades the given outbound multiaddr-net connection into a
// full libp2p-transport connection. // full libp2p-transport connection.
func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID) (transport.Conn, error) { func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, maconn manet.Conn, p core.PeerID) (transport.CapableConn, error) {
if p == "" { if p == "" {
return nil, ErrNilPeer return nil, ErrNilPeer
} }
...@@ -58,11 +58,11 @@ func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, m ...@@ -58,11 +58,11 @@ func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, m
// UpgradeInbound upgrades the given inbound multiaddr-net connection into a // UpgradeInbound upgrades the given inbound multiaddr-net connection into a
// full libp2p-transport connection. // full libp2p-transport connection.
func (u *Upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn) (transport.Conn, error) { func (u *Upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn) (transport.CapableConn, error) {
return u.upgrade(ctx, t, maconn, "") return u.upgrade(ctx, t, maconn, "")
} }
func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID) (transport.Conn, error) { func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p core.PeerID) (transport.CapableConn, error) {
if u.Filters != nil && u.Filters.AddrBlocked(maconn.RemoteMultiaddr()) { if u.Filters != nil && u.Filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr()) log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close() maconn.Close()
...@@ -78,6 +78,7 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma ...@@ -78,6 +78,7 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
} }
conn = pconn conn = pconn
} else if pnet.ForcePrivateNetwork { } else if pnet.ForcePrivateNetwork {
conn.Close()
log.Error("tried to dial with no Private Network Protector but usage" + log.Error("tried to dial with no Private Network Protector but usage" +
" of Private Networks is forced by the enviroment") " of Private Networks is forced by the enviroment")
return nil, pnet.ErrNotInPrivateNetwork return nil, pnet.ErrNotInPrivateNetwork
...@@ -93,25 +94,25 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma ...@@ -93,25 +94,25 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
return nil, fmt.Errorf("failed to negotiate security stream multiplexer: %s", err) return nil, fmt.Errorf("failed to negotiate security stream multiplexer: %s", err)
} }
return &transportConn{ return &transportConn{
Conn: smconn, MuxedConn: smconn,
ConnMultiaddrs: maconn, ConnMultiaddrs: maconn,
ConnSecurity: sconn, ConnSecurity: sconn,
transport: t, transport: t,
}, nil }, nil
} }
func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (ss.Conn, error) { func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p core.PeerID) (sec.SecureConn, error) {
if p == "" { if p == "" {
return u.Secure.SecureInbound(ctx, conn) return u.Secure.SecureInbound(ctx, conn)
} }
return u.Secure.SecureOutbound(ctx, conn, p) return u.Secure.SecureOutbound(ctx, conn, p)
} }
func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, p peer.ID) (smux.Conn, error) { func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, p core.PeerID) (mux.MuxedConn, error) {
// TODO: The muxer should take a context. // TODO: The muxer should take a context.
done := make(chan struct{}) done := make(chan struct{})
var smconn smux.Conn var smconn mux.MuxedConn
var err error var err error
go func() { go func() {
defer close(done) defer close(done)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment