local.go 2.76 KB
Newer Older
1 2 3 4 5 6
package p2p

import (
	"context"
	"time"

7
	"gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
Łukasz Magiera's avatar
Łukasz Magiera committed
8 9
	"gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
	tec "gx/ipfs/QmWHgLqrghM9zw77nF6gdvT9ExQ2RB9pLxkd8sDHZf1rWb/go-temp-err-catcher"
10
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
Łukasz Magiera's avatar
Łukasz Magiera committed
11
	"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
12
	"gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
13 14
)

Łukasz Magiera's avatar
Łukasz Magiera committed
15 16
// localListener manet streams and proxies them to libp2p services
type localListener struct {
17
	ctx context.Context
18 19 20 21

	p2p *P2P
	id  peer.ID

Łukasz Magiera's avatar
Łukasz Magiera committed
22
	proto protocol.ID
23
	laddr ma.Multiaddr
24 25 26 27 28
	peer  peer.ID

	listener manet.Listener
}

Łukasz Magiera's avatar
Łukasz Magiera committed
29
// ForwardLocal creates new P2P stream to a remote listener
Łukasz Magiera's avatar
Łukasz Magiera committed
30
func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (Listener, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
31
	listener := &localListener{
32 33
		ctx: ctx,

34 35
		p2p: p2p,
		id:  p2p.identity,
36

Łukasz Magiera's avatar
Łukasz Magiera committed
37
		proto: proto,
38
		laddr: bindAddr,
39
		peer:  peer,
40
	}
41

Łukasz Magiera's avatar
Łukasz Magiera committed
42
	if err := p2p.Listeners.Register(listener); err != nil {
43
		return nil, err
44
	}
45

46
	go listener.acceptConns()
47

48
	return listener, nil
49 50
}

Łukasz Magiera's avatar
Łukasz Magiera committed
51 52
func (l *localListener) dial(ctx context.Context) (net.Stream, error) {
	cctx, cancel := context.WithTimeout(ctx, time.Second*30) //TODO: configurable?
53 54
	defer cancel()

Łukasz Magiera's avatar
Łukasz Magiera committed
55
	return l.p2p.peerHost.NewStream(cctx, l.peer, l.proto)
56 57
}

Łukasz Magiera's avatar
Łukasz Magiera committed
58
func (l *localListener) acceptConns() {
59 60 61
	for {
		local, err := l.listener.Accept()
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
62 63 64
			if tec.ErrIsTemporary(err) {
				continue
			}
65 66 67
			return
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
68 69 70
		go l.setupStream(local)
	}
}
71

Łukasz Magiera's avatar
Łukasz Magiera committed
72 73 74 75 76 77 78
func (l *localListener) setupStream(local manet.Conn) {
	remote, err := l.dial(l.ctx)
	if err != nil {
		local.Close()
		log.Warningf("failed to dial to remote %s/%s", l.peer.Pretty(), l.proto)
		return
	}
79

80 81 82
	cmgr := l.p2p.peerHost.ConnManager()
	cmgr.TagPeer(l.peer, CMGR_TAG, 20)

Łukasz Magiera's avatar
Łukasz Magiera committed
83 84
	stream := &Stream{
		Protocol: l.proto,
85

Łukasz Magiera's avatar
Łukasz Magiera committed
86 87
		OriginAddr: local.RemoteMultiaddr(),
		TargetAddr: l.TargetAddress(),
88

Łukasz Magiera's avatar
Łukasz Magiera committed
89 90
		Local:  local,
		Remote: remote,
91

Łukasz Magiera's avatar
Łukasz Magiera committed
92
		Registry: l.p2p.Streams,
93 94 95 96

		cleanup: func() {
			cmgr.UntagPeer(l.peer, CMGR_TAG)
		},
Łukasz Magiera's avatar
Łukasz Magiera committed
97 98 99 100 101
	}

	l.p2p.Streams.Register(stream)
	stream.startStreaming()
}
102

Łukasz Magiera's avatar
Łukasz Magiera committed
103 104 105 106
func (l *localListener) start() error {
	maListener, err := manet.Listen(l.laddr)
	if err != nil {
		return err
107
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
108 109 110

	l.listener = maListener
	return nil
111 112
}

Łukasz Magiera's avatar
Łukasz Magiera committed
113
func (l *localListener) Close() error {
114 115 116
	ok, err := l.p2p.Listeners.Deregister(getListenerKey(l))
	if err != nil {
		return err
Łukasz Magiera's avatar
Łukasz Magiera committed
117
	}
118
	if ok {
119
		return l.listener.Close()
Łukasz Magiera's avatar
Łukasz Magiera committed
120
	}
121
	return nil
122 123
}

Łukasz Magiera's avatar
Łukasz Magiera committed
124 125
func (l *localListener) Protocol() protocol.ID {
	return l.proto
126 127
}

Łukasz Magiera's avatar
Łukasz Magiera committed
128 129
func (l *localListener) ListenAddress() ma.Multiaddr {
	return l.laddr
Łukasz Magiera's avatar
Łukasz Magiera committed
130 131
}

Łukasz Magiera's avatar
Łukasz Magiera committed
132 133 134 135 136 137
func (l *localListener) TargetAddress() ma.Multiaddr {
	addr, err := ma.NewMultiaddr(maPrefix + l.peer.Pretty())
	if err != nil {
		panic(err)
	}
	return addr
138
}