diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index e7673795a5ad964ce4648f5f8ac68624c7efbca7..3636b048a8f4436db6c3d9eea953ca89f39f2490 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -103,11 +103,14 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro return s.stream, nil } - if err := s.bsnet.ConnectTo(ctx, s.to); err != nil { + tctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout) + defer cancel() + + if err := s.bsnet.ConnectTo(tctx, s.to); err != nil { return nil, err } - stream, err := s.bsnet.newStreamToPeer(ctx, s.to) + stream, err := s.bsnet.newStreamToPeer(tctx, s.to) if err != nil { return nil, err } @@ -139,25 +142,20 @@ func (s *streamMessageSender) SupportsHave() bool { // Send a message to the peer, attempting multiple times func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { - return s.multiAttempt(ctx, func(fnctx context.Context) error { - return s.send(fnctx, msg) + return s.multiAttempt(ctx, func() error { + return s.send(ctx, msg) }) } // Perform a function with multiple attempts, and a timeout -func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context.Context) error) error { +func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func() error) error { // Try to call the function repeatedly var err error for i := 0; i < s.opts.MaxRetries; i++ { - deadline := time.Now().Add(s.opts.SendTimeout) - sndctx, cancel := context.WithDeadline(ctx, deadline) - - if err = fn(sndctx); err == nil { - cancel() + if err = fn(); err == nil { // Attempt was successful return nil } - cancel() // Attempt failed @@ -196,13 +194,18 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func(context. // Send a message to the peer func (s *streamMessageSender) send(ctx context.Context, msg bsmsg.BitSwapMessage) error { + start := time.Now() stream, err := s.Connect(ctx) if err != nil { log.Infof("failed to open stream to %s: %s", s.to, err) return err } - if err = s.bsnet.msgToStream(ctx, stream, msg); err != nil { + // The send timeout includes the time required to connect + // (although usually we will already have connected - we only need to + // connect after a failed attempt to send) + timeout := s.opts.SendTimeout - time.Since(start) + if err = s.bsnet.msgToStream(ctx, stream, msg, timeout); err != nil { log.Infof("failed to send message to %s: %s", s.to, err) return err } @@ -234,9 +237,9 @@ func (bsnet *impl) SupportsHave(proto protocol.ID) bool { return true } -func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error { - deadline := time.Now().Add(sendMessageTimeout) - if dl, ok := ctx.Deadline(); ok { +func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + if dl, ok := ctx.Deadline(); ok && dl.Before(deadline) { deadline = dl } @@ -277,8 +280,8 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag opts: opts, } - err := sender.multiAttempt(ctx, func(fnctx context.Context) error { - _, err := sender.Connect(fnctx) + err := sender.multiAttempt(ctx, func() error { + _, err := sender.Connect(ctx) return err }) @@ -313,7 +316,7 @@ func (bsnet *impl) SendMessage( return err } - if err = bsnet.msgToStream(ctx, s, outgoing); err != nil { + if err = bsnet.msgToStream(ctx, s, outgoing, sendMessageTimeout); err != nil { _ = s.Reset() return err }