listener.go 1.83 KB
Newer Older
1 2
package p2p

3
import (
Łukasz Magiera's avatar
Łukasz Magiera committed
4
	"errors"
5
	"sync"
Łukasz Magiera's avatar
Łukasz Magiera committed
6

tavit ohanian's avatar
tavit ohanian committed
7 8 9 10
	ma "gitlab.dms3.io/mf/go-multiaddr"
	p2phost "gitlab.dms3.io/p2p/go-p2p-core/host"
	net "gitlab.dms3.io/p2p/go-p2p-core/network"
	"gitlab.dms3.io/p2p/go-p2p-core/protocol"
11 12
)

13
// Listener listens for connections and proxies them to a target
14
type Listener interface {
Łukasz Magiera's avatar
Łukasz Magiera committed
15 16 17 18
	Protocol() protocol.ID
	ListenAddress() ma.Multiaddr
	TargetAddress() ma.Multiaddr

19
	key() string
20

Łukasz Magiera's avatar
Łukasz Magiera committed
21 22
	// close closes the listener. Does not affect child streams
	close()
23 24
}

Łukasz Magiera's avatar
Łukasz Magiera committed
25 26
// Listeners manages a group of Listener implementations,
// checking for conflicts and optionally dispatching connections
27
type Listeners struct {
28
	sync.RWMutex
Łukasz Magiera's avatar
Łukasz Magiera committed
29

30
	Listeners map[string]Listener
31 32
}

33
func newListenersLocal() *Listeners {
34 35 36 37 38
	return &Listeners{
		Listeners: map[string]Listener{},
	}
}

39
func newListenersP2P(host p2phost.Host) *Listeners {
40 41
	reg := &Listeners{
		Listeners: map[string]Listener{},
42 43 44 45 46
	}

	host.SetStreamHandlerMatch("/x/", func(p string) bool {
		reg.RLock()
		defer reg.RUnlock()
47

48 49
		_, ok := reg.Listeners[p]
		return ok
50
	}, func(stream net.Stream) {
51 52 53
		reg.RLock()
		defer reg.RUnlock()

54 55 56
		l := reg.Listeners[string(stream.Protocol())]
		if l != nil {
			go l.(*remoteListener).handleStream(stream)
57 58 59 60 61 62
		}
	})

	return reg
}

Łukasz Magiera's avatar
Łukasz Magiera committed
63
// Register registers listenerInfo into this registry and starts it
64
func (r *Listeners) Register(l Listener) error {
65 66 67
	r.Lock()
	defer r.Unlock()

68 69
	if _, ok := r.Listeners[l.key()]; ok {
		return errors.New("listener already registered")
Łukasz Magiera's avatar
Łukasz Magiera committed
70 71
	}

72
	r.Listeners[l.key()] = l
Łukasz Magiera's avatar
Łukasz Magiera committed
73
	return nil
74 75
}

Łukasz Magiera's avatar
Łukasz Magiera committed
76 77
func (r *Listeners) Close(matchFunc func(listener Listener) bool) int {
	todo := make([]Listener, 0)
Łukasz Magiera's avatar
Łukasz Magiera committed
78
	r.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
	for _, l := range r.Listeners {
		if !matchFunc(l) {
			continue
		}

		if _, ok := r.Listeners[l.key()]; ok {
			delete(r.Listeners, l.key())
			todo = append(todo, l)
		}
	}
	r.Unlock()

	for _, l := range todo {
		l.close()
	}
94

Łukasz Magiera's avatar
Łukasz Magiera committed
95
	return len(todo)
96
}