listener.go 2.28 KB
Newer Older
1 2
package p2p

3
import (
Łukasz Magiera's avatar
Łukasz Magiera committed
4
	"errors"
5
	"sync"
Łukasz Magiera's avatar
Łukasz Magiera committed
6

Łukasz Magiera's avatar
Łukasz Magiera committed
7 8
	net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net"
	peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
Łukasz Magiera's avatar
Łukasz Magiera committed
9 10
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
	"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
Łukasz Magiera's avatar
Łukasz Magiera committed
11
	p2phost "gx/ipfs/QmfH9FKYv3Jp1xiyL8sPchGBUBg6JA6XviwajAo3qgnT3B/go-libp2p-host"
12 13
)

14
// Listener listens for connections and proxies them to a target
15
type Listener interface {
Łukasz Magiera's avatar
Łukasz Magiera committed
16 17 18 19 20
	Protocol() protocol.ID
	ListenAddress() ma.Multiaddr
	TargetAddress() ma.Multiaddr

	start() error
21
	key() string
22 23 24 25 26

	// Close closes the listener. Does not affect child streams
	Close() error
}

Łukasz Magiera's avatar
Łukasz Magiera committed
27 28
// Listeners manages a group of Listener implementations,
// checking for conflicts and optionally dispatching connections
29
type Listeners struct {
30
	sync.RWMutex
Łukasz Magiera's avatar
Łukasz Magiera committed
31

32 33
	Listeners map[string]Listener
	starting  map[string]struct{}
34 35
}

36 37 38 39 40 41 42 43 44 45 46
func newListenersLocal(id peer.ID) *Listeners {
	return &Listeners{
		Listeners: map[string]Listener{},
		starting:  map[string]struct{}{},
	}
}

func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners {
	reg := &Listeners{
		Listeners: map[string]Listener{},
		starting:  map[string]struct{}{},
47 48 49 50 51
	}

	host.SetStreamHandlerMatch("/x/", func(p string) bool {
		reg.RLock()
		defer reg.RUnlock()
52

53 54
		_, ok := reg.Listeners[p]
		return ok
55
	}, func(stream net.Stream) {
56 57 58
		reg.RLock()
		defer reg.RUnlock()

59 60 61
		l := reg.Listeners[string(stream.Protocol())]
		if l != nil {
			go l.(*remoteListener).handleStream(stream)
62 63 64 65 66 67
		}
	})

	return reg
}

Łukasz Magiera's avatar
Łukasz Magiera committed
68
// Register registers listenerInfo into this registry and starts it
69
func (r *Listeners) Register(l Listener) error {
Łukasz Magiera's avatar
Łukasz Magiera committed
70
	r.Lock()
71
	k := l.key()
72

73
	if _, ok := r.Listeners[k]; ok {
Łukasz Magiera's avatar
Łukasz Magiera committed
74
		r.Unlock()
75 76 77
		return errors.New("listener already registered")
	}

78 79
	r.Listeners[k] = l
	r.starting[k] = struct{}{}
80

Łukasz Magiera's avatar
Łukasz Magiera committed
81
	r.Unlock()
82

83
	err := l.start()
Łukasz Magiera's avatar
Łukasz Magiera committed
84

85 86 87 88 89 90 91
	r.Lock()
	defer r.Unlock()

	delete(r.starting, k)

	if err != nil {
		delete(r.Listeners, k)
Łukasz Magiera's avatar
Łukasz Magiera committed
92 93 94 95
		return err
	}

	return nil
96 97 98
}

// Deregister removes p2p listener from this registry
99
func (r *Listeners) Deregister(k string) (bool, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
100 101
	r.Lock()
	defer r.Unlock()
102

103 104 105 106
	if _, ok := r.starting[k]; ok {
		return false, errors.New("listener didn't start yet")
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
107
	_, ok := r.Listeners[k]
108
	delete(r.Listeners, k)
109
	return ok, nil
110
}