remote.go 2.12 KB
Newer Older
1 2 3 4 5
package p2p

import (
	"context"

Łukasz Magiera's avatar
Łukasz Magiera committed
6 7 8
	manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
	net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
9 10 11
	protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
)

Łukasz Magiera's avatar
Łukasz Magiera committed
12 13
var maPrefix = "/" + ma.ProtocolWithCode(ma.P_IPFS).Name + "/"

Łukasz Magiera's avatar
Łukasz Magiera committed
14 15
// remoteListener accepts libp2p streams and proxies them to a manet host
type remoteListener struct {
16 17 18
	p2p *P2P

	// Application proto identifier.
Łukasz Magiera's avatar
Łukasz Magiera committed
19
	proto protocol.ID
20

21
	// Address to proxy the incoming connections to
22 23 24
	addr ma.Multiaddr
}

Łukasz Magiera's avatar
Łukasz Magiera committed
25
// ForwardRemote creates new p2p listener
Łukasz Magiera's avatar
Łukasz Magiera committed
26
func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (Listener, error) {
27
	listener := &remoteListener{
28 29
		p2p: p2p,

Łukasz Magiera's avatar
Łukasz Magiera committed
30
		proto: proto,
31
		addr:  addr,
32 33
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
34
	if err := p2p.Listeners.Register(listener); err != nil {
35 36
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
37

Łukasz Magiera's avatar
Łukasz Magiera committed
38 39 40 41 42 43 44
	return listener, nil
}

func (l *remoteListener) start() error {
	// TODO: handle errors when https://github.com/libp2p/go-libp2p-host/issues/16 will be done
	l.p2p.peerHost.SetStreamHandler(l.proto, func(remote net.Stream) {
		local, err := manet.Dial(l.addr)
45 46 47 48 49
		if err != nil {
			remote.Reset()
			return
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
50
		peerMa, err := ma.NewMultiaddr(maPrefix + remote.Conn().RemotePeer().Pretty())
51 52 53 54 55
		if err != nil {
			remote.Reset()
			return
		}

56
		stream := &Stream{
Łukasz Magiera's avatar
Łukasz Magiera committed
57
			Protocol: l.proto,
58

59
			OriginAddr: peerMa,
Łukasz Magiera's avatar
Łukasz Magiera committed
60
			TargetAddr: l.addr,
61 62 63 64

			Local:  local,
			Remote: remote,

Łukasz Magiera's avatar
Łukasz Magiera committed
65
			Registry: l.p2p.Streams,
66 67
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
68
		l.p2p.Streams.Register(stream)
69 70 71
		stream.startStreaming()
	})

Łukasz Magiera's avatar
Łukasz Magiera committed
72
	return nil
73 74
}

Łukasz Magiera's avatar
Łukasz Magiera committed
75 76
func (l *remoteListener) Protocol() protocol.ID {
	return l.proto
77 78
}

Łukasz Magiera's avatar
Łukasz Magiera committed
79 80 81 82 83 84
func (l *remoteListener) ListenAddress() ma.Multiaddr {
	addr, err := ma.NewMultiaddr(maPrefix + l.p2p.identity.Pretty())
	if err != nil {
		panic(err)
	}
	return addr
Łukasz Magiera's avatar
Łukasz Magiera committed
85 86
}

Łukasz Magiera's avatar
Łukasz Magiera committed
87 88
func (l *remoteListener) TargetAddress() ma.Multiaddr {
	return l.addr
89 90
}

Łukasz Magiera's avatar
Łukasz Magiera committed
91
func (l *remoteListener) Close() error {
92 93 94
	ok, err := l.p2p.Listeners.Deregister(getListenerKey(l))
	if err != nil {
		return err
Łukasz Magiera's avatar
Łukasz Magiera committed
95
	}
96
	if ok {
Łukasz Magiera's avatar
Łukasz Magiera committed
97 98
		l.p2p.peerHost.RemoveStreamHandler(l.proto)
	}
99
	return nil
100
}