From 2309266f73a993a709705d2a1e41cea04885d30c Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Sun, 15 Feb 2015 01:50:19 +0000 Subject: [PATCH] allow removal of stream handlers --- p2p/host/basic/basic_host.go | 4 ++++ p2p/host/host.go | 4 ++++ p2p/host/routed/routed.go | 10 ++++++++++ p2p/protocol/mux.go | 9 +++++++++ 4 files changed, 27 insertions(+) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index f6a05ee0c..9ff5a5581 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -118,6 +118,10 @@ func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler h.Mux().SetHandler(pid, handler) } +func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) { + h.Mux().RemoveHandler(pid) +} + // NewStream opens a new stream to given peer p, and writes a p2p/protocol // header with given protocol.ID. If there is no connection to p, attempts // to create one. If ProtocolID is "", writes no header. diff --git a/p2p/host/host.go b/p2p/host/host.go index 6424c9c67..e75e99836 100644 --- a/p2p/host/host.go +++ b/p2p/host/host.go @@ -46,6 +46,10 @@ type Host interface { // (Threadsafe) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) + // RemoveStreamHandler removes a handler on the mux that was set by + // SetStreamHandler + RemoveStreamHandler(pid protocol.ID) + // NewStream opens a new stream to given peer p, and writes a p2p/protocol // header with given protocol.ID. If there is no connection to p, attempts // to create one. If ProtocolID is "", writes no header. diff --git a/p2p/host/routed/routed.go b/p2p/host/routed/routed.go index 68594f836..9f18869fe 100644 --- a/p2p/host/routed/routed.go +++ b/p2p/host/routed/routed.go @@ -84,21 +84,31 @@ func logRoutingErrDifferentPeers(ctx context.Context, wanted, got peer.ID, err e func (rh *RoutedHost) ID() peer.ID { return rh.host.ID() } + func (rh *RoutedHost) Peerstore() peer.Peerstore { return rh.host.Peerstore() } + func (rh *RoutedHost) Addrs() []ma.Multiaddr { return rh.host.Addrs() } + func (rh *RoutedHost) Network() inet.Network { return rh.host.Network() } + func (rh *RoutedHost) Mux() *protocol.Mux { return rh.host.Mux() } + func (rh *RoutedHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) { rh.host.SetStreamHandler(pid, handler) } + +func (rh *RoutedHost) RemoveStreamHandler(pid protocol.ID) { + rh.host.RemoveStreamHandler(pid) +} + func (rh *RoutedHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) { return rh.host.NewStream(pid, p) } diff --git a/p2p/protocol/mux.go b/p2p/protocol/mux.go index 078952d39..ddf27a586 100644 --- a/p2p/protocol/mux.go +++ b/p2p/protocol/mux.go @@ -90,6 +90,15 @@ func (m *Mux) SetHandler(p ID, h inet.StreamHandler) { m.lock.Unlock() } +// RemoveHandler removes the protocol handler on the Network's Muxer. +// This operation is threadsafe. +func (m *Mux) RemoveHandler(p ID) { + log.Debugf("%s removing handler for protocol: %s (%d)", m, p, len(p)) + m.lock.Lock() + delete(m.handlers, p) + m.lock.Unlock() +} + // Handle reads the next name off the Stream, and calls a handler function // This is done in its own goroutine, to avoid blocking the caller. func (m *Mux) Handle(s inet.Stream) { -- GitLab