Commit 5936bda5 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #3445 from ipfs/feat/bitswap-sendmsg-deadline

bitswap: add a deadline to sendmsg calls
parents ecd52489 90faeaf2
......@@ -38,7 +38,7 @@ type BitSwapNetwork interface {
}
type MessageSender interface {
SendMsg(bsmsg.BitSwapMessage) error
SendMsg(context.Context, bsmsg.BitSwapMessage) error
Close() error
}
......
......@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"time"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
......@@ -20,6 +21,8 @@ import (
var log = logging.Logger("bitswap_network")
var sendMessageTimeout = time.Minute * 10
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host
func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork {
bitswapNetwork := impl{
......@@ -53,11 +56,20 @@ func (s *streamMessageSender) Close() error {
return s.s.Close()
}
func (s *streamMessageSender) SendMsg(msg bsmsg.BitSwapMessage) error {
return msgToStream(s.s, msg)
func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
return msgToStream(ctx, s.s, msg)
}
func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error {
func msgToStream(ctx context.Context, s inet.Stream, msg bsmsg.BitSwapMessage) error {
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
if err := s.SetWriteDeadline(deadline); err != nil {
log.Warningf("error setting deadline: %s", err)
}
switch s.Protocol() {
case ProtocolBitswap:
if err := msg.ToNetV1(s); err != nil {
......@@ -72,6 +84,10 @@ func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error {
default:
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
}
if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warningf("error resetting deadline: %s", err)
}
return nil
}
......@@ -107,7 +123,7 @@ func (bsnet *impl) SendMessage(
}
defer s.Close()
return msgToStream(s, outgoing)
return msgToStream(ctx, s, outgoing)
}
func (bsnet *impl) SetDelegate(r Receiver) {
......
......@@ -119,8 +119,8 @@ type messagePasser struct {
ctx context.Context
}
func (mp *messagePasser) SendMsg(m bsmsg.BitSwapMessage) error {
return mp.net.SendMessage(mp.ctx, mp.local, mp.target, m)
func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error {
return mp.net.SendMessage(ctx, mp.local, mp.target, m)
}
func (mp *messagePasser) Close() error {
......
......@@ -196,7 +196,7 @@ func (mq *msgQueue) doWork(ctx context.Context) {
// send wantlist updates
for { // try to send this message until we fail.
err := mq.sender.SendMsg(wlm)
err := mq.sender.SendMsg(ctx, wlm)
if err == nil {
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment