local.go 2.33 KB
Newer Older
1 2 3 4 5 6
package p2p

import (
	"context"
	"time"

Jakub Sztandera's avatar
Jakub Sztandera committed
7
	tec "github.com/jbenet/go-temp-err-catcher"
tavit ohanian's avatar
tavit ohanian committed
8 9 10 11 12
	ma "gitlab.dms3.io/mf/go-multiaddr"
	manet "gitlab.dms3.io/mf/go-multiaddr/net"
	net "gitlab.dms3.io/p2p/go-p2p-core/network"
	"gitlab.dms3.io/p2p/go-p2p-core/peer"
	"gitlab.dms3.io/p2p/go-p2p-core/protocol"
13 14
)

tavit ohanian's avatar
tavit ohanian committed
15
// localListener manet streams and proxies them to p2p services
Łukasz Magiera's avatar
Łukasz Magiera committed
16
type localListener struct {
17
	ctx context.Context
18 19 20

	p2p *P2P

Łukasz Magiera's avatar
Łukasz Magiera committed
21
	proto protocol.ID
22
	laddr ma.Multiaddr
23 24 25 26 27
	peer  peer.ID

	listener manet.Listener
}

Łukasz Magiera's avatar
Łukasz Magiera committed
28
// ForwardLocal creates new P2P stream to a remote listener
29
func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (Listener, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
30
	listener := &localListener{
Overbool's avatar
Overbool committed
31 32
		ctx:   ctx,
		p2p:   p2p,
Łukasz Magiera's avatar
Łukasz Magiera committed
33
		proto: proto,
34
		peer:  peer,
35
	}
36

Overbool's avatar
Overbool committed
37
	maListener, err := manet.Listen(bindAddr)
38 39 40 41 42
	if err != nil {
		return nil, err
	}

	listener.listener = maListener
Overbool's avatar
Overbool committed
43
	listener.laddr = maListener.Multiaddr()
44

Łukasz Magiera's avatar
Łukasz Magiera committed
45
	if err := p2p.ListenersLocal.Register(listener); err != nil {
46
		return nil, err
47
	}
48

49
	go listener.acceptConns()
50

51
	return listener, nil
52 53
}

Łukasz Magiera's avatar
Łukasz Magiera committed
54 55
func (l *localListener) dial(ctx context.Context) (net.Stream, error) {
	cctx, cancel := context.WithTimeout(ctx, time.Second*30) //TODO: configurable?
56 57
	defer cancel()

Łukasz Magiera's avatar
Łukasz Magiera committed
58
	return l.p2p.peerHost.NewStream(cctx, l.peer, l.proto)
59 60
}

Łukasz Magiera's avatar
Łukasz Magiera committed
61
func (l *localListener) acceptConns() {
62 63 64
	for {
		local, err := l.listener.Accept()
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
65 66 67
			if tec.ErrIsTemporary(err) {
				continue
			}
68 69 70
			return
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
71 72 73
		go l.setupStream(local)
	}
}
74

Łukasz Magiera's avatar
Łukasz Magiera committed
75 76 77 78
func (l *localListener) setupStream(local manet.Conn) {
	remote, err := l.dial(l.ctx)
	if err != nil {
		local.Close()
79
		log.Warnf("failed to dial to remote %s/%s", l.peer.Pretty(), l.proto)
Łukasz Magiera's avatar
Łukasz Magiera committed
80 81
		return
	}
82

Łukasz Magiera's avatar
Łukasz Magiera committed
83 84
	stream := &Stream{
		Protocol: l.proto,
85

Łukasz Magiera's avatar
Łukasz Magiera committed
86 87
		OriginAddr: local.RemoteMultiaddr(),
		TargetAddr: l.TargetAddress(),
Łukasz Magiera's avatar
Łukasz Magiera committed
88
		peer:       l.peer,
89

Łukasz Magiera's avatar
Łukasz Magiera committed
90 91
		Local:  local,
		Remote: remote,
92

Łukasz Magiera's avatar
Łukasz Magiera committed
93 94 95 96
		Registry: l.p2p.Streams,
	}

	l.p2p.Streams.Register(stream)
97 98
}

Łukasz Magiera's avatar
Łukasz Magiera committed
99 100
func (l *localListener) close() {
	l.listener.Close()
101 102
}

Łukasz Magiera's avatar
Łukasz Magiera committed
103 104
func (l *localListener) Protocol() protocol.ID {
	return l.proto
105 106
}

Łukasz Magiera's avatar
Łukasz Magiera committed
107 108
func (l *localListener) ListenAddress() ma.Multiaddr {
	return l.laddr
Łukasz Magiera's avatar
Łukasz Magiera committed
109 110
}

Łukasz Magiera's avatar
Łukasz Magiera committed
111 112 113 114 115 116
func (l *localListener) TargetAddress() ma.Multiaddr {
	addr, err := ma.NewMultiaddr(maPrefix + l.peer.Pretty())
	if err != nil {
		panic(err)
	}
	return addr
117
}
118 119 120 121

func (l *localListener) key() string {
	return l.ListenAddress().String()
}