Module: Que

Defined in:
lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb

Defined Under Namespace

Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker

Constant Summary collapse

HASH_DEFAULT_PROC =
proc { |hash, key| hash[key.to_s] if Symbol === key }
INDIFFERENTIATOR =
proc do |object|
  case object
  when Array
    object.each(&INDIFFERENTIATOR)
  when Hash
    object.default_proc = HASH_DEFAULT_PROC
    object.each { |key, value| object[key] = INDIFFERENTIATOR.call(value) }
    object
  else
    object
  end
end
SYMBOLIZER =
proc do |object|
  case object
  when Hash
    object.keys.each do |key|
      object[key.to_sym] = SYMBOLIZER.call(object.delete(key))
    end
    object
  when Array
    object.map! { |e| SYMBOLIZER.call(e) }
  else
    object
  end
end
SQL =
{
  # Locks a job using a Postgres recursive CTE [1].
  #
  # As noted by the Postgres documentation, it may be slightly easier to
  # think about this expression as iteration rather than recursion, despite
  # the `RECURSION` nomenclature defined by the SQL standards committee.
  # Recursion is used here so that jobs in the table can be iterated one-by-
  # one until a lock can be acquired, where a non-recursive `SELECT` would
  # have the undesirable side-effect of locking multiple jobs at once. i.e.
  # Consider that the following would have the worker lock *all* unlocked
  # jobs:
  #
  #   SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
  #   FROM que_jobs AS j;
  #
  # The CTE will initially produce an "anchor" from the non-recursive term
  # (i.e. before the `UNION`), and then use it as the contents of the
  # working table as it continues to iterate through `que_jobs` looking for
  # a lock. The jobs table has a sort on (priority, run_at, job_id) which
  # allows it to walk the jobs table in a stable manner. As noted above, the
  # recursion examines one job at a time so that it only ever acquires a
  # single lock.
  #
  # The recursion has two possible end conditions:
  #
  # 1. If a lock *can* be acquired, it bubbles up to the top-level `SELECT`
  #    outside of the `job` CTE which stops recursion because it is
  #    constrained with a `LIMIT` of 1.
  #
  # 2. If a lock *cannot* be acquired, the recursive term of the expression
  #    (i.e. what's after the `UNION`) will return an empty result set
  #    because there are no more candidates left that could possibly be
  #    locked. This empty result automatically ends recursion.
  #
  # Note that this query can be easily modified to lock any number of jobs
  # by tweaking the LIMIT clause in the main SELECT statement.
  #
  # [1] http://www.postgresql.org/docs/devel/static/queries-with.html
  #
  # Thanks to RhodiumToad in #postgresql for help with the original version
  # of the job lock CTE.
  :lock_job => %{
    WITH RECURSIVE jobs AS (
      SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
      FROM (
        SELECT j
        FROM que_jobs AS j
        WHERE queue = $1::text
        AND run_at <= now()
        ORDER BY priority, run_at, job_id
        LIMIT 1
      ) AS t1
      UNION ALL (
        SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
        FROM (
          SELECT (
            SELECT j
            FROM que_jobs AS j
            WHERE queue = $1::text
            AND run_at <= now()
            AND (priority, run_at, job_id) > (jobs.priority, jobs.run_at, jobs.job_id)
            ORDER BY priority, run_at, job_id
            LIMIT 1
          ) AS j
          FROM jobs
          WHERE jobs.job_id IS NOT NULL
          LIMIT 1
        ) AS t1
      )
    )
    SELECT queue, priority, run_at, job_id, job_class, args, error_count
    FROM jobs
    WHERE locked
    LIMIT 1
  }.freeze,

  :check_job => %{
    SELECT 1 AS one
    FROM   que_jobs
    WHERE  queue    = $1::text
    AND    priority = $2::smallint
    AND    run_at   = $3::timestamptz
    AND    job_id   = $4::bigint
  }.freeze,

  :set_error => %{
    UPDATE que_jobs
    SET error_count = error_count + 1,
        run_at      = now() + $1::bigint * '1 second'::interval,
        last_error  = $2::text
    WHERE queue     = $3::text
    AND   priority  = $4::smallint
    AND   run_at    = $5::timestamptz
    AND   job_id    = $6::bigint
  }.freeze,

  :insert_job => %{
    INSERT INTO que_jobs
    (queue, priority, run_at, job_class, args)
    VALUES
    (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json)
    RETURNING *
  }.freeze,

  :destroy_job => %{
    DELETE FROM que_jobs
    WHERE queue    = $1::text
    AND   priority = $2::smallint
    AND   run_at   = $3::timestamptz
    AND   job_id   = $4::bigint
  }.freeze,

  :job_stats => %{
    SELECT queue,
           job_class,
           count(*)                    AS count,
           count(locks.job_id)         AS count_working,
           sum((error_count > 0)::int) AS count_errored,
           max(error_count)            AS highest_error_count,
           min(run_at)                 AS oldest_run_at
    FROM que_jobs
    LEFT JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id
      FROM pg_locks
      WHERE locktype = 'advisory'
    ) locks USING (job_id)
    GROUP BY queue, job_class
    ORDER BY count(*) DESC
  }.freeze,

  :worker_states_95 => %{
    SELECT que_jobs.*,
           pg.pid          AS pg_backend_pid,
           pg.state        AS pg_state,
           pg.state_change AS pg_state_changed_at,
           pg.query        AS pg_last_query,
           pg.query_start  AS pg_last_query_started_at,
           pg.xact_start   AS pg_transaction_started_at,
           pg.waiting      AS pg_waiting_on_lock
    FROM que_jobs
    JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.*
      FROM pg_locks
      JOIN pg_stat_activity USING (pid)
      WHERE locktype = 'advisory'
    ) pg USING (job_id)
  }.freeze,

  :worker_states_96 => %{
    SELECT que_jobs.*,
           pg.pid                         AS pg_backend_pid,
           pg.state                       AS pg_state,
           pg.state_change                AS pg_state_changed_at,
           pg.query                       AS pg_last_query,
           pg.query_start                 AS pg_last_query_started_at,
           pg.xact_start                  AS pg_transaction_started_at,
           pg.wait_event_type IS NOT NULL AS pg_waiting_on_lock
    FROM que_jobs
    JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.*
      FROM pg_locks
      JOIN pg_stat_activity USING (pid)
      WHERE locktype = 'advisory'
    ) pg USING (job_id)
  }.freeze,
}.freeze
Version =
'0.14.3'

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.adapterObject



63
64
65
# File 'lib/que.rb', line 63

def adapter
  @adapter || raise("Que connection not established!")
end

.error_notifierObject

Returns the value of attribute error_notifier.



44
45
46
# File 'lib/que.rb', line 44

def error_notifier
  @error_notifier
end

.json_converterObject



189
190
191
# File 'lib/que.rb', line 189

def json_converter
  @json_converter ||= INDIFFERENTIATOR
end

.log_formatterObject



125
126
127
# File 'lib/que.rb', line 125

def log_formatter
  @log_formatter ||= JSON.method(:dump)
end

.loggerObject



121
122
123
# File 'lib/que.rb', line 121

def logger
  @logger.respond_to?(:call) ? @logger.call : @logger
end

.use_prepared_statementsObject



129
130
131
132
# File 'lib/que.rb', line 129

def use_prepared_statements
  setting = @use_prepared_statements
  setting.nil? ? true : setting
end

Class Method Details

.clear!Object



71
72
73
# File 'lib/que.rb', line 71

def clear!
  execute "DELETE FROM que_jobs"
end

.connection=(connection) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/que.rb', line 47

def connection=(connection)
  self.adapter =
    if connection.to_s == 'ActiveRecord'
      Adapters::ActiveRecord.new
    else
      case connection.class.to_s
      when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection)
      when 'ConnectionPool'             then Adapters::ConnectionPool.new(connection)
      when 'PG::Connection'             then Adapters::PG.new(connection)
      when 'Pond'                       then Adapters::Pond.new(connection)
      when 'NilClass'                   then connection
      else raise "Que connection not recognized: #{connection.inspect}"
      end
    end
end

.constantize(camel_cased_word) ⇒ Object



154
155
156
157
158
159
160
161
# File 'lib/que.rb', line 154

def constantize(camel_cased_word)
  if camel_cased_word.respond_to?(:constantize)
    # Use ActiveSupport's version if it exists.
    camel_cased_word.constantize
  else
    camel_cased_word.split('::').inject(Object, &:const_get)
  end
end

.create!Object

Have to support create! and drop! in old migrations. They just created and dropped the bare table.



104
105
106
# File 'lib/que.rb', line 104

def create!
  migrate! :version => 1
end

.db_versionObject



94
95
96
# File 'lib/que.rb', line 94

def db_version
  Migrations.db_version
end

.disable_prepared_statementsObject



134
135
136
137
# File 'lib/que.rb', line 134

def disable_prepared_statements
  warn "Que.disable_prepared_statements has been deprecated, please update your code to invert the result of Que.disable_prepared_statements instead. This shim will be removed in Que version 1.0.0."
  !use_prepared_statements
end

.disable_prepared_statements=(setting) ⇒ Object



139
140
141
142
# File 'lib/que.rb', line 139

def disable_prepared_statements=(setting)
  warn "Que.disable_prepared_statements= has been deprecated, please update your code to pass the inverted value to Que.use_prepared_statements= instead. This shim will be removed in Que version 1.0.0."
  self.use_prepared_statements = !setting
end

.drop!Object



108
109
110
# File 'lib/que.rb', line 108

def drop!
  migrate! :version => 0
end

.enqueue(*args) ⇒ Object

Give us a cleaner interface when specifying a job_class as a string.



90
91
92
# File 'lib/que.rb', line 90

def enqueue(*args)
  Job.enqueue(*args)
end

.error_handlerObject



144
145
146
147
# File 'lib/que.rb', line 144

def error_handler
  warn "Que.error_handler has been renamed to Que.error_notifier, please update your code. This shim will be removed in Que version 1.0.0."
  error_notifier
end

.error_handler=(p) ⇒ Object



149
150
151
152
# File 'lib/que.rb', line 149

def error_handler=(p)
  warn "Que.error_handler= has been renamed to Que.error_notifier=, please update your code. This shim will be removed in Que version 1.0.0."
  self.error_notifier = p
end

.execute(*args) ⇒ Object



67
68
69
# File 'lib/que.rb', line 67

def execute(*args)
  adapter.execute(*args)
end

.job_statsObject



75
76
77
# File 'lib/que.rb', line 75

def job_stats
  execute :job_stats
end

.log(data) ⇒ Object



112
113
114
115
116
117
118
119
# File 'lib/que.rb', line 112

def log(data)
  level = data.delete(:level) || :info
  data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data)

  if (l = logger) && output = log_formatter.call(data)
    l.send level, output
  end
end

.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object



98
99
100
# File 'lib/que.rb', line 98

def migrate!(version = {:version => Migrations::CURRENT_VERSION})
  Migrations.migrate!(version)
end

.transactionObject

A helper method to manage transactions, used mainly by the migration system. It’s available for general use, but if you’re using an ORM that provides its own transaction helper, be sure to use that instead, or the two may interfere with one another.



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/que.rb', line 167

def transaction
  adapter.checkout do
    if adapter.in_transaction?
      yield
    else
      begin
        execute "BEGIN"
        yield
      rescue => error
        raise
      ensure
        # Handle a raised error or a killed thread.
        if error || Thread.current.status == 'aborting'
          execute "ROLLBACK"
        else
          execute "COMMIT"
        end
      end
    end
  end
end

.worker_statesObject



79
80
81
82
83
84
85
86
87
# File 'lib/que.rb', line 79

def worker_states
  adapter.checkout do |conn|
    if conn.server_version >= 90600
      execute :worker_states_96
    else
      execute :worker_states_95
    end
  end
end