remote.go 2.21 KB
Newer Older
1 2 3 4
package p2p

import (
	"context"
Łukasz Magiera's avatar
Łukasz Magiera committed
5
	"errors"
6

Łukasz Magiera's avatar
Łukasz Magiera committed
7 8 9
	manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
	net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
10 11 12
	protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
)

Łukasz Magiera's avatar
Łukasz Magiera committed
13 14
var maPrefix = "/" + ma.ProtocolWithCode(ma.P_IPFS).Name + "/"

Łukasz Magiera's avatar
Łukasz Magiera committed
15 16
// remoteListener accepts libp2p streams and proxies them to a manet host
type remoteListener struct {
17 18 19
	p2p *P2P

	// Application proto identifier.
Łukasz Magiera's avatar
Łukasz Magiera committed
20
	proto protocol.ID
21

22
	// Address to proxy the incoming connections to
23
	addr ma.Multiaddr
Łukasz Magiera's avatar
Łukasz Magiera committed
24 25

	initialized bool
26 27
}

Łukasz Magiera's avatar
Łukasz Magiera committed
28
// ForwardRemote creates new p2p listener
Łukasz Magiera's avatar
Łukasz Magiera committed
29
func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (Listener, error) {
30
	listener := &remoteListener{
31 32
		p2p: p2p,

Łukasz Magiera's avatar
Łukasz Magiera committed
33
		proto: proto,
34
		addr:  addr,
35 36
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
37
	if err := p2p.Listeners.Register(listener); err != nil {
38 39
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
40

Łukasz Magiera's avatar
Łukasz Magiera committed
41 42 43 44 45 46 47
	return listener, nil
}

func (l *remoteListener) start() error {
	// TODO: handle errors when https://github.com/libp2p/go-libp2p-host/issues/16 will be done
	l.p2p.peerHost.SetStreamHandler(l.proto, func(remote net.Stream) {
		local, err := manet.Dial(l.addr)
48 49 50 51 52
		if err != nil {
			remote.Reset()
			return
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
53
		peerMa, err := ma.NewMultiaddr(maPrefix + remote.Conn().RemotePeer().Pretty())
54 55 56 57 58
		if err != nil {
			remote.Reset()
			return
		}

59
		stream := &Stream{
Łukasz Magiera's avatar
Łukasz Magiera committed
60
			Protocol: l.proto,
61

62
			OriginAddr: peerMa,
Łukasz Magiera's avatar
Łukasz Magiera committed
63
			TargetAddr: l.addr,
64 65 66 67

			Local:  local,
			Remote: remote,

Łukasz Magiera's avatar
Łukasz Magiera committed
68
			Registry: l.p2p.Streams,
69 70
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
71
		l.p2p.Streams.Register(stream)
72 73 74
		stream.startStreaming()
	})

Łukasz Magiera's avatar
Łukasz Magiera committed
75 76
	l.initialized = true
	return nil
77 78
}

Łukasz Magiera's avatar
Łukasz Magiera committed
79 80
func (l *remoteListener) Protocol() protocol.ID {
	return l.proto
81 82
}

Łukasz Magiera's avatar
Łukasz Magiera committed
83 84 85 86 87 88
func (l *remoteListener) ListenAddress() ma.Multiaddr {
	addr, err := ma.NewMultiaddr(maPrefix + l.p2p.identity.Pretty())
	if err != nil {
		panic(err)
	}
	return addr
Łukasz Magiera's avatar
Łukasz Magiera committed
89 90
}

Łukasz Magiera's avatar
Łukasz Magiera committed
91 92
func (l *remoteListener) TargetAddress() ma.Multiaddr {
	return l.addr
93 94
}

Łukasz Magiera's avatar
Łukasz Magiera committed
95
func (l *remoteListener) Close() error {
Łukasz Magiera's avatar
Łukasz Magiera committed
96 97 98 99 100 101 102 103
	if !l.initialized {
		return errors.New("uninitialized")
	}

	if l.p2p.Listeners.Deregister(getListenerKey(l)) {
		l.p2p.peerHost.RemoveStreamHandler(l.proto)
		l.initialized = false
	}
104
	return nil
105
}