Unverified Commit f4b63eef authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #369 from ipfs/fix/over-allocated-ctx

fix: avoid calling ctx.SetDeadline() every time we send a message
parents 2a033735 0b7aab09
......@@ -103,11 +103,14 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro
return s.stream, nil
}
if err := s.bsnet.ConnectTo(ctx, s.to); err != nil {
tctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout)
defer cancel()
if err := s.bsnet.ConnectTo(tctx, s.to); err != nil {
return nil, err
}
stream, err := s.bsnet.newStreamToPeer(ctx, s.to)
stream, err := s.bsnet.newStreamToPeer(tctx, s.to)
if err != nil {
return nil, err
}
......@@ -139,25 +142,20 @@ func (s *streamMessageSender) SupportsHave() bool {
// Send a message to the peer, attempting multiple times
func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
return s.multiAttempt(ctx, func(fnctx context.Context) error {
return s.send(fnctx, msg)
return s.multiAttempt(ctx, func() error {
return s.send(ctx, msg)
})
}
// Perform a function with multiple attempts, and a timeout
func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context.Context) error) error {
func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func() error) error {
// Try to call the function repeatedly
var err error
for i := 0; i < s.opts.MaxRetries; i++ {
deadline := time.Now().Add(s.opts.SendTimeout)
sndctx, cancel := context.WithDeadline(ctx, deadline)
if err = fn(sndctx); err == nil {
cancel()
if err = fn(); err == nil {
// Attempt was successful
return nil
}
cancel()
// Attempt failed
......@@ -196,13 +194,18 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context.
// Send a message to the peer
func (s *streamMessageSender) send(ctx context.Context, msg bsmsg.BitSwapMessage) error {
start := time.Now()
stream, err := s.Connect(ctx)
if err != nil {
log.Infof("failed to open stream to %s: %s", s.to, err)
return err
}
if err = s.bsnet.msgToStream(ctx, stream, msg); err != nil {
// The send timeout includes the time required to connect
// (although usually we will already have connected - we only need to
// connect after a failed attempt to send)
timeout := s.opts.SendTimeout - time.Since(start)
if err = s.bsnet.msgToStream(ctx, stream, msg, timeout); err != nil {
log.Infof("failed to send message to %s: %s", s.to, err)
return err
}
......@@ -234,9 +237,9 @@ func (bsnet *impl) SupportsHave(proto protocol.ID) bool {
return true
}
func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error {
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
if dl, ok := ctx.Deadline(); ok && dl.Before(deadline) {
deadline = dl
}
......@@ -277,8 +280,8 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag
opts: opts,
}
err := sender.multiAttempt(ctx, func(fnctx context.Context) error {
_, err := sender.Connect(fnctx)
err := sender.multiAttempt(ctx, func() error {
_, err := sender.Connect(ctx)
return err
})
......@@ -313,7 +316,7 @@ func (bsnet *impl) SendMessage(
return err
}
if err = bsnet.msgToStream(ctx, s, outgoing); err != nil {
if err = bsnet.msgToStream(ctx, s, outgoing, sendMessageTimeout); err != nil {
_ = s.Reset()
return err
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment