local.go 2.63 KB
Newer Older
1 2 3 4 5 6
package p2p

import (
	"context"
	"time"

Łukasz Magiera's avatar
Łukasz Magiera committed
7 8 9 10 11 12
	"gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
	"gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
	tec "gx/ipfs/QmWHgLqrghM9zw77nF6gdvT9ExQ2RB9pLxkd8sDHZf1rWb/go-temp-err-catcher"
	"gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
	"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
13 14
)

Łukasz Magiera's avatar
Łukasz Magiera committed
15 16
// localListener manet streams and proxies them to libp2p services
type localListener struct {
17
	ctx context.Context
18 19 20 21

	p2p *P2P
	id  peer.ID

Łukasz Magiera's avatar
Łukasz Magiera committed
22
	proto protocol.ID
23
	laddr ma.Multiaddr
24 25 26 27 28
	peer  peer.ID

	listener manet.Listener
}

Łukasz Magiera's avatar
Łukasz Magiera committed
29
// ForwardLocal creates new P2P stream to a remote listener
Łukasz Magiera's avatar
Łukasz Magiera committed
30
func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (Listener, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
31
	listener := &localListener{
32 33
		ctx: ctx,

34 35
		p2p: p2p,
		id:  p2p.identity,
36

Łukasz Magiera's avatar
Łukasz Magiera committed
37
		proto: proto,
38
		laddr: bindAddr,
39
		peer:  peer,
40
	}
41

Łukasz Magiera's avatar
Łukasz Magiera committed
42
	if err := p2p.Listeners.Register(listener); err != nil {
43
		return nil, err
44
	}
45

46
	go listener.acceptConns()
47

48
	return listener, nil
49 50
}

Łukasz Magiera's avatar
Łukasz Magiera committed
51 52
func (l *localListener) dial(ctx context.Context) (net.Stream, error) {
	cctx, cancel := context.WithTimeout(ctx, time.Second*30) //TODO: configurable?
53 54
	defer cancel()

Łukasz Magiera's avatar
Łukasz Magiera committed
55
	return l.p2p.peerHost.NewStream(cctx, l.peer, l.proto)
56 57
}

Łukasz Magiera's avatar
Łukasz Magiera committed
58
func (l *localListener) acceptConns() {
59 60 61
	for {
		local, err := l.listener.Accept()
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
62 63 64
			if tec.ErrIsTemporary(err) {
				continue
			}
65 66 67
			return
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
68 69 70
		go l.setupStream(local)
	}
}
71

Łukasz Magiera's avatar
Łukasz Magiera committed
72 73 74 75 76 77 78
func (l *localListener) setupStream(local manet.Conn) {
	remote, err := l.dial(l.ctx)
	if err != nil {
		local.Close()
		log.Warningf("failed to dial to remote %s/%s", l.peer.Pretty(), l.proto)
		return
	}
79

Łukasz Magiera's avatar
Łukasz Magiera committed
80 81
	stream := &Stream{
		Protocol: l.proto,
82

Łukasz Magiera's avatar
Łukasz Magiera committed
83 84
		OriginAddr: local.RemoteMultiaddr(),
		TargetAddr: l.TargetAddress(),
85

Łukasz Magiera's avatar
Łukasz Magiera committed
86 87
		Local:  local,
		Remote: remote,
88

Łukasz Magiera's avatar
Łukasz Magiera committed
89 90 91 92 93 94
		Registry: l.p2p.Streams,
	}

	l.p2p.Streams.Register(stream)
	stream.startStreaming()
}
95

Łukasz Magiera's avatar
Łukasz Magiera committed
96 97 98 99
func (l *localListener) start() error {
	maListener, err := manet.Listen(l.laddr)
	if err != nil {
		return err
100
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
101 102 103

	l.listener = maListener
	return nil
104 105
}

Łukasz Magiera's avatar
Łukasz Magiera committed
106
func (l *localListener) Close() error {
107 108 109
	ok, err := l.p2p.Listeners.Deregister(getListenerKey(l))
	if err != nil {
		return err
Łukasz Magiera's avatar
Łukasz Magiera committed
110
	}
111
	if ok {
112
		return l.listener.Close()
Łukasz Magiera's avatar
Łukasz Magiera committed
113
	}
114
	return nil
115 116
}

Łukasz Magiera's avatar
Łukasz Magiera committed
117 118
func (l *localListener) Protocol() protocol.ID {
	return l.proto
119 120
}

Łukasz Magiera's avatar
Łukasz Magiera committed
121 122
func (l *localListener) ListenAddress() ma.Multiaddr {
	return l.laddr
Łukasz Magiera's avatar
Łukasz Magiera committed
123 124
}

Łukasz Magiera's avatar
Łukasz Magiera committed
125 126 127 128 129 130
func (l *localListener) TargetAddress() ma.Multiaddr {
	addr, err := ma.NewMultiaddr(maPrefix + l.peer.Pretty())
	if err != nil {
		panic(err)
	}
	return addr
131
}