Commit c12f9777 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #3445 from ipfs/feat/bitswap-sendmsg-deadline

bitswap: add a deadline to sendmsg calls
parents f8eca5e1 02975bde
...@@ -38,7 +38,7 @@ type BitSwapNetwork interface { ...@@ -38,7 +38,7 @@ type BitSwapNetwork interface {
} }
type MessageSender interface { type MessageSender interface {
SendMsg(bsmsg.BitSwapMessage) error SendMsg(context.Context, bsmsg.BitSwapMessage) error
Close() error Close() error
} }
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"time"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
...@@ -20,6 +21,8 @@ import ( ...@@ -20,6 +21,8 @@ import (
var log = logging.Logger("bitswap_network") var log = logging.Logger("bitswap_network")
var sendMessageTimeout = time.Minute * 10
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host
func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork { func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork {
bitswapNetwork := impl{ bitswapNetwork := impl{
...@@ -53,11 +56,20 @@ func (s *streamMessageSender) Close() error { ...@@ -53,11 +56,20 @@ func (s *streamMessageSender) Close() error {
return s.s.Close() return s.s.Close()
} }
func (s *streamMessageSender) SendMsg(msg bsmsg.BitSwapMessage) error { func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
return msgToStream(s.s, msg) return msgToStream(ctx, s.s, msg)
} }
func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error { func msgToStream(ctx context.Context, s inet.Stream, msg bsmsg.BitSwapMessage) error {
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
if err := s.SetWriteDeadline(deadline); err != nil {
log.Warningf("error setting deadline: %s", err)
}
switch s.Protocol() { switch s.Protocol() {
case ProtocolBitswap: case ProtocolBitswap:
if err := msg.ToNetV1(s); err != nil { if err := msg.ToNetV1(s); err != nil {
...@@ -72,6 +84,10 @@ func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error { ...@@ -72,6 +84,10 @@ func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error {
default: default:
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol()) return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
} }
if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warningf("error resetting deadline: %s", err)
}
return nil return nil
} }
...@@ -107,7 +123,7 @@ func (bsnet *impl) SendMessage( ...@@ -107,7 +123,7 @@ func (bsnet *impl) SendMessage(
} }
defer s.Close() defer s.Close()
return msgToStream(s, outgoing) return msgToStream(ctx, s, outgoing)
} }
func (bsnet *impl) SetDelegate(r Receiver) { func (bsnet *impl) SetDelegate(r Receiver) {
......
...@@ -119,8 +119,8 @@ type messagePasser struct { ...@@ -119,8 +119,8 @@ type messagePasser struct {
ctx context.Context ctx context.Context
} }
func (mp *messagePasser) SendMsg(m bsmsg.BitSwapMessage) error { func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error {
return mp.net.SendMessage(mp.ctx, mp.local, mp.target, m) return mp.net.SendMessage(ctx, mp.local, mp.target, m)
} }
func (mp *messagePasser) Close() error { func (mp *messagePasser) Close() error {
......
...@@ -196,7 +196,7 @@ func (mq *msgQueue) doWork(ctx context.Context) { ...@@ -196,7 +196,7 @@ func (mq *msgQueue) doWork(ctx context.Context) {
// send wantlist updates // send wantlist updates
for { // try to send this message until we fail. for { // try to send this message until we fail.
err := mq.sender.SendMsg(wlm) err := mq.sender.SendMsg(ctx, wlm)
if err == nil { if err == nil {
return return
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment