Commit 6cd6f553 authored by Steven Allen's avatar Steven Allen

use stream.Reset where appropriate

License: MIT
Signed-off-by: default avatarSteven Allen <steven@stebalien.com>
parent 557bef8c
......@@ -40,6 +40,7 @@ type BitSwapNetwork interface {
type MessageSender interface {
SendMsg(context.Context, bsmsg.BitSwapMessage) error
Close() error
Reset() error
}
// Implement Receiver to receive messages from the BitSwapNetwork
......
......@@ -56,6 +56,10 @@ func (s *streamMessageSender) Close() error {
return s.s.Close()
}
func (s *streamMessageSender) Reset() error {
return s.s.Reset()
}
func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
return msgToStream(ctx, s.s, msg)
}
......@@ -121,9 +125,14 @@ func (bsnet *impl) SendMessage(
if err != nil {
return err
}
defer s.Close()
return msgToStream(ctx, s, outgoing)
err = msgToStream(ctx, s, outgoing)
if err != nil {
s.Reset()
} else {
s.Close()
}
return err
}
func (bsnet *impl) SetDelegate(r Receiver) {
......@@ -180,6 +189,7 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
defer s.Close()
if bsnet.receiver == nil {
s.Reset()
return
}
......@@ -188,6 +198,7 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
received, err := bsmsg.FromPBReader(reader)
if err != nil {
if err != io.EOF {
s.Reset()
go bsnet.receiver.ReceiveError(err)
log.Debugf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
}
......
......@@ -133,6 +133,10 @@ func (mp *messagePasser) Close() error {
return nil
}
func (mp *messagePasser) Reset() error {
return nil
}
func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
return &messagePasser{
net: n.network,
......
......@@ -172,18 +172,19 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) {
}
func (mq *msgQueue) runQueue(ctx context.Context) {
defer func() {
if mq.sender != nil {
mq.sender.Close()
}
}()
for {
select {
case <-mq.work: // there is work to be done
mq.doWork(ctx)
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-ctx.Done():
if mq.sender != nil {
mq.sender.Reset()
}
return
}
}
......@@ -218,7 +219,7 @@ func (mq *msgQueue) doWork(ctx context.Context) {
}
log.Infof("bitswap send error: %s", err)
mq.sender.Close()
mq.sender.Reset()
mq.sender = nil
select {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment