listener.go 2.41 KB
Newer Older
1 2
package p2p

3
import (
Łukasz Magiera's avatar
Łukasz Magiera committed
4
	"errors"
5
	"sync"
Łukasz Magiera's avatar
Łukasz Magiera committed
6

Łukasz Magiera's avatar
Łukasz Magiera committed
7 8
	net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net"
	peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
Łukasz Magiera's avatar
Łukasz Magiera committed
9 10
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
	"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
Łukasz Magiera's avatar
Łukasz Magiera committed
11
	p2phost "gx/ipfs/QmfH9FKYv3Jp1xiyL8sPchGBUBg6JA6XviwajAo3qgnT3B/go-libp2p-host"
12 13
)

14
// Listener listens for connections and proxies them to a target
15
type Listener interface {
Łukasz Magiera's avatar
Łukasz Magiera committed
16 17 18 19 20
	Protocol() protocol.ID
	ListenAddress() ma.Multiaddr
	TargetAddress() ma.Multiaddr

	start() error
21
	key() string
22 23 24 25 26

	// Close closes the listener. Does not affect child streams
	Close() error
}

27
type Listeners struct {
28
	sync.RWMutex
Łukasz Magiera's avatar
Łukasz Magiera committed
29

30 31
	Listeners map[string]Listener
	starting  map[string]struct{}
32 33
}

34 35 36 37 38 39 40 41 42 43 44
func newListenersLocal(id peer.ID) *Listeners {
	return &Listeners{
		Listeners: map[string]Listener{},
		starting:  map[string]struct{}{},
	}
}

func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners {
	reg := &Listeners{
		Listeners: map[string]Listener{},
		starting:  map[string]struct{}{},
45 46 47 48 49 50 51 52 53 54
	}

	addr, err := ma.NewMultiaddr(maPrefix + id.Pretty())
	if err != nil {
		panic(err)
	}

	host.SetStreamHandlerMatch("/x/", func(p string) bool {
		reg.RLock()
		defer reg.RUnlock()
55

56 57 58 59 60 61 62 63
		for _, l := range reg.Listeners {
			if l.ListenAddress().Equal(addr) && string(l.Protocol()) == p {
				return true
			}
		}

		return false
	}, func(stream net.Stream) {
64 65 66
		reg.RLock()
		defer reg.RUnlock()

67 68
		for _, l := range reg.Listeners {
			if l.ListenAddress().Equal(addr) && l.Protocol() == stream.Protocol() {
69
				go l.(*remoteListener).handleStream(stream)
70
				return
71 72 73 74 75 76 77
			}
		}
	})

	return reg
}

Łukasz Magiera's avatar
Łukasz Magiera committed
78
// Register registers listenerInfo into this registry and starts it
79
func (r *Listeners) Register(l Listener) error {
Łukasz Magiera's avatar
Łukasz Magiera committed
80
	r.Lock()
81
	k := l.key()
82

83
	if _, ok := r.Listeners[k]; ok {
Łukasz Magiera's avatar
Łukasz Magiera committed
84
		r.Unlock()
85 86 87
		return errors.New("listener already registered")
	}

88 89
	r.Listeners[k] = l
	r.starting[k] = struct{}{}
90

Łukasz Magiera's avatar
Łukasz Magiera committed
91
	r.Unlock()
92

93
	err := l.start()
Łukasz Magiera's avatar
Łukasz Magiera committed
94

95 96 97 98 99 100 101
	r.Lock()
	defer r.Unlock()

	delete(r.starting, k)

	if err != nil {
		delete(r.Listeners, k)
Łukasz Magiera's avatar
Łukasz Magiera committed
102 103 104 105
		return err
	}

	return nil
106 107 108
}

// Deregister removes p2p listener from this registry
109
func (r *Listeners) Deregister(k string) (bool, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
110 111
	r.Lock()
	defer r.Unlock()
112

113 114 115 116
	if _, ok := r.starting[k]; ok {
		return false, errors.New("listener didn't start yet")
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
117
	_, ok := r.Listeners[k]
118
	delete(r.Listeners, k)
119
	return ok, nil
120
}