Unverified Commit fe35a3cf authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #81 from libp2p/feat/rw-close

feat: use new stream interfaces from go-libp2p-core 0.7.0
parents ac8fa95f 519ffb34
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"errors" "errors"
"time" "time"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
...@@ -38,7 +37,7 @@ func newFetchProtocol(ctx context.Context, host host.Host, getData getValue) *fe ...@@ -38,7 +37,7 @@ func newFetchProtocol(ctx context.Context, host host.Host, getData getValue) *fe
} }
func (p *fetchProtocol) receive(s network.Stream, getData getValue) { func (p *fetchProtocol) receive(s network.Stream, getData getValue) {
defer helpers.FullClose(s) defer s.Close()
msg := &pb.FetchRequest{} msg := &pb.FetchRequest{}
if err := readMsg(p.ctx, s, msg); err != nil { if err := readMsg(p.ctx, s, msg); err != nil {
...@@ -57,6 +56,7 @@ func (p *fetchProtocol) receive(s network.Stream, getData getValue) { ...@@ -57,6 +56,7 @@ func (p *fetchProtocol) receive(s network.Stream, getData getValue) {
} }
if err := writeMsg(p.ctx, s, &respProto); err != nil { if err := writeMsg(p.ctx, s, &respProto); err != nil {
s.Reset()
return return
} }
} }
...@@ -69,17 +69,23 @@ func (p *fetchProtocol) Fetch(ctx context.Context, pid peer.ID, key string) ([]b ...@@ -69,17 +69,23 @@ func (p *fetchProtocol) Fetch(ctx context.Context, pid peer.ID, key string) ([]b
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer helpers.FullClose(s) defer s.Close()
msg := &pb.FetchRequest{Identifier: key} msg := &pb.FetchRequest{Identifier: key}
if err := writeMsg(ctx, s, msg); err != nil { if err := writeMsg(ctx, s, msg); err != nil {
_ = s.Reset()
return nil, err
}
if err := s.CloseWrite(); err != nil {
_ = s.Reset()
return nil, err return nil, err
} }
s.Close()
response := &pb.FetchResponse{} response := &pb.FetchResponse{}
if err := readMsg(ctx, s, response); err != nil { if err := readMsg(ctx, s, response); err != nil {
_ = s.Reset()
return nil, err return nil, err
} }
...@@ -114,7 +120,6 @@ func writeMsg(ctx context.Context, s network.Stream, msg proto.Message) error { ...@@ -114,7 +120,6 @@ func writeMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
} }
if retErr != nil { if retErr != nil {
s.Reset()
log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), retErr) log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), retErr)
} }
return retErr return retErr
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment