Commit 8849193d authored by Łukasz Magiera's avatar Łukasz Magiera

p2p: more locks

License: MIT
Signed-off-by: default avatarŁukasz Magiera <magik6k@gmail.com>
parent 3c6a168d
......@@ -231,6 +231,7 @@ var p2pLsCmd = &cmds.Command{
output := &P2PLsOutput{}
n.P2P.Listeners.Lock()
for _, listener := range n.P2P.Listeners.Listeners {
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Protocol: string(listener.Protocol()),
......@@ -238,6 +239,7 @@ var p2pLsCmd = &cmds.Command{
TargetAddress: listener.TargetAddress().String(),
})
}
n.P2P.Listeners.Unlock()
res.SetOutput(output)
},
......@@ -402,6 +404,7 @@ var p2pStreamLsCmd = &cmds.Command{
output := &P2PStreamsOutput{}
n.P2P.Streams.Lock()
for id, s := range n.P2P.Streams.Streams {
output.Streams = append(output.Streams, P2PStreamInfoOutput{
HandlerID: strconv.FormatUint(id, 10),
......@@ -412,6 +415,7 @@ var p2pStreamLsCmd = &cmds.Command{
TargetAddress: s.TargetAddr.String(),
})
}
n.P2P.Streams.Unlock()
res.SetOutput(output)
},
......@@ -476,15 +480,22 @@ var p2pStreamCloseCmd = &cmds.Command{
}
}
toClose := make([]*p2p.Stream, 0, 1)
n.P2P.Streams.Lock()
for id, stream := range n.P2P.Streams.Streams {
if !closeAll && handlerID != id {
continue
}
stream.Reset()
toClose = append(toClose, stream)
if !closeAll {
break
}
}
n.P2P.Streams.Unlock()
for _, s := range toClose {
s.Reset()
}
},
}
......
......@@ -63,16 +63,16 @@ func (s *Stream) startStreaming() {
// StreamRegistry is a collection of active incoming and outgoing proto app streams.
type StreamRegistry struct {
Streams map[uint64]*Stream
lk sync.Mutex
sync.Mutex
nextID uint64
Streams map[uint64]*Stream
nextID uint64
}
// Register registers a stream to the registry
func (r *StreamRegistry) Register(streamInfo *Stream) {
r.lk.Lock()
defer r.lk.Unlock()
r.Lock()
defer r.Unlock()
streamInfo.id = r.nextID
r.Streams[r.nextID] = streamInfo
......@@ -81,8 +81,8 @@ func (r *StreamRegistry) Register(streamInfo *Stream) {
// Deregister deregisters stream from the registry
func (r *StreamRegistry) Deregister(streamID uint64) {
r.lk.Lock()
defer r.lk.Unlock()
r.Lock()
defer r.Unlock()
delete(r.Streams, streamID)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment