remote.go 2.14 KB
Newer Older
1 2 3 4 5
package p2p

import (
	"context"

Łukasz Magiera's avatar
Łukasz Magiera committed
6
	net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net"
Łukasz Magiera's avatar
Łukasz Magiera committed
7 8
	manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
9 10 11
	protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
)

Łukasz Magiera's avatar
Łukasz Magiera committed
12 13
var maPrefix = "/" + ma.ProtocolWithCode(ma.P_IPFS).Name + "/"

Łukasz Magiera's avatar
Łukasz Magiera committed
14 15
// remoteListener accepts libp2p streams and proxies them to a manet host
type remoteListener struct {
16 17 18
	p2p *P2P

	// Application proto identifier.
Łukasz Magiera's avatar
Łukasz Magiera committed
19
	proto protocol.ID
20

21
	// Address to proxy the incoming connections to
22 23 24
	addr ma.Multiaddr
}

Łukasz Magiera's avatar
Łukasz Magiera committed
25
// ForwardRemote creates new p2p listener
26
func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (ListenerP2P, error) {
27
	listener := &remoteListener{
28 29
		p2p: p2p,

Łukasz Magiera's avatar
Łukasz Magiera committed
30
		proto: proto,
31
		addr:  addr,
32 33
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
34
	if err := p2p.ListenersP2P.Register(listener); err != nil {
35 36
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
37

Łukasz Magiera's avatar
Łukasz Magiera committed
38 39 40 41
	return listener, nil
}

func (l *remoteListener) start() error {
42 43
	return nil
}
44

45 46 47 48 49 50
func (l *remoteListener) handleStream(remote net.Stream) {
	local, err := manet.Dial(l.addr)
	if err != nil {
		remote.Reset()
		return
	}
51

52
	peer := remote.Conn().RemotePeer()
53

54 55 56 57 58
	peerMa, err := ma.NewMultiaddr(maPrefix + peer.Pretty())
	if err != nil {
		remote.Reset()
		return
	}
59

60 61
	cmgr := l.p2p.peerHost.ConnManager()
	cmgr.TagPeer(peer, CMGR_TAG, 20)
62

63 64
	stream := &Stream{
		Protocol: l.proto,
65

66 67
		OriginAddr: peerMa,
		TargetAddr: l.addr,
68

69 70
		Local:  local,
		Remote: remote,
71

72
		Registry: l.p2p.Streams,
73

74 75 76 77
		cleanup: func() {
			cmgr.UntagPeer(peer, CMGR_TAG)
		},
	}
78

79 80
	l.p2p.Streams.Register(stream)
	stream.startStreaming()
81 82
}

Łukasz Magiera's avatar
Łukasz Magiera committed
83 84
func (l *remoteListener) Protocol() protocol.ID {
	return l.proto
85 86
}

Łukasz Magiera's avatar
Łukasz Magiera committed
87 88 89 90 91 92
func (l *remoteListener) ListenAddress() ma.Multiaddr {
	addr, err := ma.NewMultiaddr(maPrefix + l.p2p.identity.Pretty())
	if err != nil {
		panic(err)
	}
	return addr
Łukasz Magiera's avatar
Łukasz Magiera committed
93 94
}

Łukasz Magiera's avatar
Łukasz Magiera committed
95 96
func (l *remoteListener) TargetAddress() ma.Multiaddr {
	return l.addr
97 98
}

Łukasz Magiera's avatar
Łukasz Magiera committed
99
func (l *remoteListener) Close() error {
Łukasz Magiera's avatar
Łukasz Magiera committed
100
	ok, err := l.p2p.ListenersP2P.Deregister(l.proto)
101 102
	if err != nil {
		return err
Łukasz Magiera's avatar
Łukasz Magiera committed
103
	}
104
	if ok {
Łukasz Magiera's avatar
Łukasz Magiera committed
105 106
		l.p2p.peerHost.RemoveStreamHandler(l.proto)
	}
107
	return nil
108
}