Class: Qless::Middleware::Timeout

Inherits:
Module
  • Object
show all
Defined in:
lib/qless/middleware/timeout.rb

Overview

Applies a hard time out. To use this middleware, instantiate it and pass a block; the block will be passed the job object (which has a ‘ttl` method for getting the job’s remaining TTL), and the block should return the desired timeout in seconds. This allows you to set a hard constant time out to a particular job class (using something like ‘extend Qless::Middleware::Timeout.new { 60 * 60 }`), or a variable timeout based on the individual TTLs of each job (using something like `extend Qless::Middleware::Timeout.new { |job| job.ttl * 1.1 }`).

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Timeout

Returns a new instance of Timeout.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/qless/middleware/timeout.rb', line 19

def initialize(opts = {})
  timeout_class = opts.fetch(:timeout_class, ::Timeout)
  kernel_class = opts.fetch(:kernel_class, Kernel)
  module_eval do
    define_method :around_perform do |job|
      timeout_seconds = yield job

      return super(job) if timeout_seconds.nil?

      if !timeout_seconds.is_a?(Numeric) || timeout_seconds <= 0
        raise InvalidTimeoutError, "Timeout must be a positive number or nil, " \
                                   "but was #{timeout_seconds}"
      end

      begin
        timeout_class.timeout(timeout_seconds) { super(job) }
      rescue ::Timeout::Error => e
        error = JobTimedoutError.new("Qless: job timeout (#{timeout_seconds}) exceeded.")
        error.set_backtrace(e.backtrace)
        # The stalled connection to redis might be the cause of the timeout. We cannot rely
        # on state of connection either (e.g., we might be in the middle of Redis call when
        # timeout happend). To play it safe, we reconnect.
        job.reconnect_to_redis
        job.fail(*Qless.failure_formatter.format(job, error, []))
        # Since we are leaving with bang (exit!), normal requeue logic does not work.
        # Do it manually right here.
        if self.is_a?(::Qless::Middleware::RequeueExceptions) &&
           self.requeueable?(JobTimedoutError)
          self.handle_exception(job, error)
        end

        # ::Timeout.timeout is dangerous to use as it can leave things in an inconsistent
        # state. With Redis, for example, we've seen the socket buffer left with unread bytes
        # on it, which can affect later redis calls. Thus, it's much safer just to exit, and
        # allow the parent process to restart the worker in a known, clean state.
        #
        # We use 73 as a unique exit status for this case. 73 looks
        # a bit like TE (Timeout::Error)
        kernel_class.exit!(73)
      end
    end
  end
end