Module: Que

Defined in:
lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb

Defined Under Namespace

Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker

Constant Summary collapse

HASH_DEFAULT_PROC =
proc { |hash, key| hash[key.to_s] if Symbol === key }
INDIFFERENTIATOR =
proc do |object|
  case object
  when Array
    object.each(&INDIFFERENTIATOR)
  when Hash
    object.default_proc = HASH_DEFAULT_PROC
    object.each { |key, value| object[key] = INDIFFERENTIATOR.call(value) }
    object
  else
    object
  end
end
SYMBOLIZER =
proc do |object|
  case object
  when Hash
    object.keys.each do |key|
      object[key.to_sym] = SYMBOLIZER.call(object.delete(key))
    end
    object
  when Array
    object.map! { |e| SYMBOLIZER.call(e) }
  else
    object
  end
end
SQL =
{
  # Locks a job using a Postgres recursive CTE [1].
  #
  # As noted by the Postgres documentation, it may be slightly easier to
  # think about this expression as iteration rather than recursion, despite
  # the `RECURSION` nomenclature defined by the SQL standards committee.
  # Recursion is used here so that jobs in the table can be iterated one-by-
  # one until a lock can be acquired, where a non-recursive `SELECT` would
  # have the undesirable side-effect of locking multiple jobs at once. i.e.
  # Consider that the following would have the worker lock *all* unlocked
  # jobs:
  #
  #   SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
  #   FROM que_jobs AS j;
  #
  # The CTE will initially produce an "anchor" from the non-recursive term
  # (i.e. before the `UNION`), and then use it as the contents of the
  # working table as it continues to iterate through `que_jobs` looking for
  # a lock. The jobs table has a sort on (priority, run_at, job_id) which
  # allows it to walk the jobs table in a stable manner. As noted above, the
  # recursion examines one job at a time so that it only ever acquires a
  # single lock.
  #
  # The recursion has two possible end conditions:
  #
  # 1. If a lock *can* be acquired, it bubbles up to the top-level `SELECT`
  #    outside of the `job` CTE which stops recursion because it is
  #    constrained with a `LIMIT` of 1.
  #
  # 2. If a lock *cannot* be acquired, the recursive term of the expression
  #    (i.e. what's after the `UNION`) will return an empty result set
  #    because there are no more candidates left that could possibly be
  #    locked. This empty result automatically ends recursion.
  #
  # Note that this query can be easily modified to lock any number of jobs
  # by tweaking the LIMIT clause in the main SELECT statement.
  #
  # [1] http://www.postgresql.org/docs/devel/static/queries-with.html
  #
  # Thanks to RhodiumToad in #postgresql for help with the original version
  # of the job lock CTE.
  :lock_job => %{
    WITH RECURSIVE jobs AS (
      SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
      FROM (
        SELECT j
        FROM que_jobs AS j
        WHERE queue = $1::text
        AND run_at <= now()
        ORDER BY priority, run_at, job_id
        LIMIT 1
      ) AS t1
      UNION ALL (
        SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
        FROM (
          SELECT (
            SELECT j
            FROM que_jobs AS j
            WHERE queue = $1::text
            AND run_at <= now()
            AND (priority, run_at, job_id) > (jobs.priority, jobs.run_at, jobs.job_id)
            ORDER BY priority, run_at, job_id
            LIMIT 1
          ) AS j
          FROM jobs
          WHERE jobs.job_id IS NOT NULL
          LIMIT 1
        ) AS t1
      )
    )
    SELECT queue, priority, run_at, job_id, job_class, args, error_count
    FROM jobs
    WHERE locked
    LIMIT 1
  }.freeze,

  :check_job => %{
    SELECT 1 AS one
    FROM   que_jobs
    WHERE  queue    = $1::text
    AND    priority = $2::smallint
    AND    run_at   = $3::timestamptz
    AND    job_id   = $4::bigint
  }.freeze,

  :set_error => %{
    UPDATE que_jobs
    SET error_count = error_count + 1,
        run_at      = now() + $1::bigint * '1 second'::interval,
        last_error  = $2::text
    WHERE queue     = $3::text
    AND   priority  = $4::smallint
    AND   run_at    = $5::timestamptz
    AND   job_id    = $6::bigint
  }.freeze,

  :insert_job => %{
    INSERT INTO que_jobs
    (queue, priority, run_at, job_class, args)
    VALUES
    (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json)
    RETURNING *
  }.freeze,

  :destroy_job => %{
    DELETE FROM que_jobs
    WHERE queue    = $1::text
    AND   priority = $2::smallint
    AND   run_at   = $3::timestamptz
    AND   job_id   = $4::bigint
  }.freeze,

  :job_stats => %{
    SELECT queue,
           job_class,
           count(*)                    AS count,
           count(locks.job_id)         AS count_working,
           sum((error_count > 0)::int) AS count_errored,
           max(error_count)            AS highest_error_count,
           min(run_at)                 AS oldest_run_at
    FROM que_jobs
    LEFT JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id
      FROM pg_locks
      WHERE locktype = 'advisory'
    ) locks USING (job_id)
    GROUP BY queue, job_class
    ORDER BY count(*) DESC
  }.freeze,

  :worker_states => %{
    SELECT que_jobs.*,
           pg.pid          AS pg_backend_pid,
           pg.state        AS pg_state,
           pg.state_change AS pg_state_changed_at,
           pg.query        AS pg_last_query,
           pg.query_start  AS pg_last_query_started_at,
           pg.xact_start   AS pg_transaction_started_at,
           pg.waiting      AS pg_waiting_on_lock
    FROM que_jobs
    JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.*
      FROM pg_locks
      JOIN pg_stat_activity USING (pid)
      WHERE locktype = 'advisory'
    ) pg USING (job_id)
  }.freeze
}.freeze
Version =
'0.12.0'

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.adapterObject



70
71
72
# File 'lib/que.rb', line 70

def adapter
  @adapter || raise("Que connection not established!")
end

.disable_prepared_statementsObject



130
131
132
# File 'lib/que.rb', line 130

def disable_prepared_statements
  @disable_prepared_statements || false
end

.error_notifierObject

Returns the value of attribute error_notifier.



51
52
53
# File 'lib/que.rb', line 51

def error_notifier
  @error_notifier
end

.json_converterObject



179
180
181
# File 'lib/que.rb', line 179

def json_converter
  @json_converter ||= INDIFFERENTIATOR
end

.log_formatterObject



126
127
128
# File 'lib/que.rb', line 126

def log_formatter
  @log_formatter ||= JSON_MODULE.method(:dump)
end

.loggerObject



122
123
124
# File 'lib/que.rb', line 122

def logger
  @logger.respond_to?(:call) ? @logger.call : @logger
end

Class Method Details

.clear!Object



78
79
80
# File 'lib/que.rb', line 78

def clear!
  execute "DELETE FROM que_jobs"
end

.connection=(connection) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/que.rb', line 54

def connection=(connection)
  self.adapter =
    if connection.to_s == 'ActiveRecord'
      Adapters::ActiveRecord.new
    else
      case connection.class.to_s
      when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection)
      when 'ConnectionPool'             then Adapters::ConnectionPool.new(connection)
      when 'PG::Connection'             then Adapters::PG.new(connection)
      when 'Pond'                       then Adapters::Pond.new(connection)
      when 'NilClass'                   then connection
      else raise "Que connection not recognized: #{connection.inspect}"
      end
    end
end

.constantize(camel_cased_word) ⇒ Object



144
145
146
147
148
149
150
151
# File 'lib/que.rb', line 144

def constantize(camel_cased_word)
  if camel_cased_word.respond_to?(:constantize)
    # Use ActiveSupport's version if it exists.
    camel_cased_word.constantize
  else
    camel_cased_word.split('::').inject(Object, &:const_get)
  end
end

.create!Object

Have to support create! and drop! in old migrations. They just created and dropped the bare table.



105
106
107
# File 'lib/que.rb', line 105

def create!
  migrate! :version => 1
end

.db_versionObject



95
96
97
# File 'lib/que.rb', line 95

def db_version
  Migrations.db_version
end

.drop!Object



109
110
111
# File 'lib/que.rb', line 109

def drop!
  migrate! :version => 0
end

.enqueue(*args) ⇒ Object

Give us a cleaner interface when specifying a job_class as a string.



91
92
93
# File 'lib/que.rb', line 91

def enqueue(*args)
  Job.enqueue(*args)
end

.error_handlerObject



134
135
136
137
# File 'lib/que.rb', line 134

def error_handler
  warn "Que.error_handler has been renamed to Que.error_notifier, please update your code. This shim will be removed in Que version 1.0.0."
  error_notifier
end

.error_handler=(p) ⇒ Object



139
140
141
142
# File 'lib/que.rb', line 139

def error_handler=(p)
  warn "Que.error_handler= has been renamed to Que.error_notifier=, please update your code. This shim will be removed in Que version 1.0.0."
  self.error_notifier = p
end

.execute(*args) ⇒ Object



74
75
76
# File 'lib/que.rb', line 74

def execute(*args)
  adapter.execute(*args)
end

.job_statsObject



82
83
84
# File 'lib/que.rb', line 82

def job_stats
  execute :job_stats
end

.log(data) ⇒ Object



113
114
115
116
117
118
119
120
# File 'lib/que.rb', line 113

def log(data)
  level = data.delete(:level) || :info
  data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data)

  if (l = logger) && output = log_formatter.call(data)
    l.send level, output
  end
end

.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object



99
100
101
# File 'lib/que.rb', line 99

def migrate!(version = {:version => Migrations::CURRENT_VERSION})
  Migrations.migrate!(version)
end

.transactionObject

A helper method to manage transactions, used mainly by the migration system. It’s available for general use, but if you’re using an ORM that provides its own transaction helper, be sure to use that instead, or the two may interfere with one another.



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/que.rb', line 157

def transaction
  adapter.checkout do
    if adapter.in_transaction?
      yield
    else
      begin
        execute "BEGIN"
        yield
      rescue => error
        raise
      ensure
        # Handle a raised error or a killed thread.
        if error || Thread.current.status == 'aborting'
          execute "ROLLBACK"
        else
          execute "COMMIT"
        end
      end
    end
  end
end

.worker_statesObject



86
87
88
# File 'lib/que.rb', line 86

def worker_states
  execute :worker_states
end