listener.go 2.16 KB
Newer Older
1 2
package p2p

3
import (
Łukasz Magiera's avatar
Łukasz Magiera committed
4
	"errors"
5
	"sync"
Łukasz Magiera's avatar
Łukasz Magiera committed
6

Łukasz Magiera's avatar
Łukasz Magiera committed
7 8
	net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net"
	peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
Łukasz Magiera's avatar
Łukasz Magiera committed
9 10
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
	"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
Łukasz Magiera's avatar
Łukasz Magiera committed
11
	p2phost "gx/ipfs/QmfH9FKYv3Jp1xiyL8sPchGBUBg6JA6XviwajAo3qgnT3B/go-libp2p-host"
12 13
)

14
// Listener listens for connections and proxies them to a target
15
type Listener interface {
Łukasz Magiera's avatar
Łukasz Magiera committed
16 17 18 19 20
	Protocol() protocol.ID
	ListenAddress() ma.Multiaddr
	TargetAddress() ma.Multiaddr

	start() error
21
	key() string
22 23 24 25 26

	// Close closes the listener. Does not affect child streams
	Close() error
}

27
type Listeners struct {
28
	sync.RWMutex
Łukasz Magiera's avatar
Łukasz Magiera committed
29

30 31
	Listeners map[string]Listener
	starting  map[string]struct{}
32 33
}

34 35 36 37 38 39 40 41 42 43 44
func newListenersLocal(id peer.ID) *Listeners {
	return &Listeners{
		Listeners: map[string]Listener{},
		starting:  map[string]struct{}{},
	}
}

func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners {
	reg := &Listeners{
		Listeners: map[string]Listener{},
		starting:  map[string]struct{}{},
45 46 47 48 49
	}

	host.SetStreamHandlerMatch("/x/", func(p string) bool {
		reg.RLock()
		defer reg.RUnlock()
50

51 52
		_, ok := reg.Listeners[p]
		return ok
53
	}, func(stream net.Stream) {
54 55 56
		reg.RLock()
		defer reg.RUnlock()

57 58 59
		l := reg.Listeners[string(stream.Protocol())]
		if l != nil {
			go l.(*remoteListener).handleStream(stream)
60 61 62 63 64 65
		}
	})

	return reg
}

Łukasz Magiera's avatar
Łukasz Magiera committed
66
// Register registers listenerInfo into this registry and starts it
67
func (r *Listeners) Register(l Listener) error {
Łukasz Magiera's avatar
Łukasz Magiera committed
68
	r.Lock()
69
	k := l.key()
70

71
	if _, ok := r.Listeners[k]; ok {
Łukasz Magiera's avatar
Łukasz Magiera committed
72
		r.Unlock()
73 74 75
		return errors.New("listener already registered")
	}

76 77
	r.Listeners[k] = l
	r.starting[k] = struct{}{}
78

Łukasz Magiera's avatar
Łukasz Magiera committed
79
	r.Unlock()
80

81
	err := l.start()
Łukasz Magiera's avatar
Łukasz Magiera committed
82

83 84 85 86 87 88 89
	r.Lock()
	defer r.Unlock()

	delete(r.starting, k)

	if err != nil {
		delete(r.Listeners, k)
Łukasz Magiera's avatar
Łukasz Magiera committed
90 91 92 93
		return err
	}

	return nil
94 95 96
}

// Deregister removes p2p listener from this registry
97
func (r *Listeners) Deregister(k string) (bool, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
98 99
	r.Lock()
	defer r.Unlock()
100

101 102 103 104
	if _, ok := r.starting[k]; ok {
		return false, errors.New("listener didn't start yet")
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
105
	_, ok := r.Listeners[k]
106
	delete(r.Listeners, k)
107
	return ok, nil
108
}