Module: Que

Defined in:
lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb

Defined Under Namespace

Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker

Constant Summary collapse

HASH_DEFAULT_PROC =
proc { |hash, key| hash[key.to_s] if Symbol === key }
INDIFFERENTIATOR =
proc do |object|
  case object
  when Array
    object.each(&INDIFFERENTIATOR)
  when Hash
    object.default_proc = HASH_DEFAULT_PROC
    object.each { |key, value| object[key] = INDIFFERENTIATOR.call(value) }
    object
  else
    object
  end
end
SYMBOLIZER =
proc do |object|
  case object
  when Hash
    object.keys.each do |key|
      object[key.to_sym] = SYMBOLIZER.call(object.delete(key))
    end
    object
  when Array
    object.map! { |e| SYMBOLIZER.call(e) }
  else
    object
  end
end
SQL =
{
  # Locks a job using a Postgres recursive CTE [1].
  #
  # As noted by the Postgres documentation, it may be slightly easier to
  # think about this expression as iteration rather than recursion, despite
  # the `RECURSION` nomenclature defined by the SQL standards committee.
  # Recursion is used here so that jobs in the table can be iterated one-by-
  # one until a lock can be acquired, where a non-recursive `SELECT` would
  # have the undesirable side-effect of locking multiple jobs at once. i.e.
  # Consider that the following would have the worker lock *all* unlocked
  # jobs:
  #
  #   SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
  #   FROM que_jobs AS j;
  #
  # The CTE will initially produce an "anchor" from the non-recursive term
  # (i.e. before the `UNION`), and then use it as the contents of the
  # working table as it continues to iterate through `que_jobs` looking for
  # a lock. The jobs table has a sort on (priority, run_at, job_id) which
  # allows it to walk the jobs table in a stable manner. As noted above, the
  # recursion examines one job at a time so that it only ever acquires a
  # single lock.
  #
  # The recursion has two possible end conditions:
  #
  # 1. If a lock *can* be acquired, it bubbles up to the top-level `SELECT`
  #    outside of the `job` CTE which stops recursion because it is
  #    constrained with a `LIMIT` of 1.
  #
  # 2. If a lock *cannot* be acquired, the recursive term of the expression
  #    (i.e. what's after the `UNION`) will return an empty result set
  #    because there are no more candidates left that could possibly be
  #    locked. This empty result automatically ends recursion.
  #
  # Note that this query can be easily modified to lock any number of jobs
  # by tweaking the LIMIT clause in the main SELECT statement.
  #
  # [1] http://www.postgresql.org/docs/devel/static/queries-with.html
  #
  # Thanks to RhodiumToad in #postgresql for help with the original version
  # of the job lock CTE.
  :lock_job => %{
    WITH RECURSIVE jobs AS (
      SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
      FROM (
        SELECT j
        FROM que_jobs AS j
        WHERE queue = $1::text
        AND run_at <= now()
        ORDER BY priority, run_at, job_id
        LIMIT 1
      ) AS t1
      UNION ALL (
        SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
        FROM (
          SELECT (
            SELECT j
            FROM que_jobs AS j
            WHERE queue = $1::text
            AND run_at <= now()
            AND (priority, run_at, job_id) > (jobs.priority, jobs.run_at, jobs.job_id)
            ORDER BY priority, run_at, job_id
            LIMIT 1
          ) AS j
          FROM jobs
          WHERE jobs.job_id IS NOT NULL
          LIMIT 1
        ) AS t1
      )
    )
    SELECT queue, priority, run_at, job_id, job_class, args, error_count
    FROM jobs
    WHERE locked
    LIMIT 1
  }.freeze,

  :check_job => %{
    SELECT 1 AS one
    FROM   que_jobs
    WHERE  queue    = $1::text
    AND    priority = $2::smallint
    AND    run_at   = $3::timestamptz
    AND    job_id   = $4::bigint
  }.freeze,

  :set_error => %{
    UPDATE que_jobs
    SET error_count = error_count + 1,
        run_at      = now() + $1::bigint * '1 second'::interval,
        last_error  = $2::text
    WHERE queue     = $3::text
    AND   priority  = $4::smallint
    AND   run_at    = $5::timestamptz
    AND   job_id    = $6::bigint
  }.freeze,

  :insert_job => %{
    INSERT INTO que_jobs
    (queue, priority, run_at, job_class, args)
    VALUES
    (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json)
    RETURNING *
  }.freeze,

  :destroy_job => %{
    DELETE FROM que_jobs
    WHERE queue    = $1::text
    AND   priority = $2::smallint
    AND   run_at   = $3::timestamptz
    AND   job_id   = $4::bigint
  }.freeze,

  :job_stats => %{
    SELECT queue,
           job_class,
           count(*)                    AS count,
           count(locks.job_id)         AS count_working,
           sum((error_count > 0)::int) AS count_errored,
           max(error_count)            AS highest_error_count,
           min(run_at)                 AS oldest_run_at
    FROM que_jobs
    LEFT JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id
      FROM pg_locks
      WHERE locktype = 'advisory'
    ) locks USING (job_id)
    GROUP BY queue, job_class
    ORDER BY count(*) DESC
  }.freeze,

  :worker_states_95 => %{
    SELECT que_jobs.*,
           pg.pid          AS pg_backend_pid,
           pg.state        AS pg_state,
           pg.state_change AS pg_state_changed_at,
           pg.query        AS pg_last_query,
           pg.query_start  AS pg_last_query_started_at,
           pg.xact_start   AS pg_transaction_started_at,
           pg.waiting      AS pg_waiting_on_lock
    FROM que_jobs
    JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.*
      FROM pg_locks
      JOIN pg_stat_activity USING (pid)
      WHERE locktype = 'advisory'
    ) pg USING (job_id)
  }.freeze,

  :worker_states_96 => %{
    SELECT que_jobs.*,
           pg.pid                         AS pg_backend_pid,
           pg.state                       AS pg_state,
           pg.state_change                AS pg_state_changed_at,
           pg.query                       AS pg_last_query,
           pg.query_start                 AS pg_last_query_started_at,
           pg.xact_start                  AS pg_transaction_started_at,
           pg.wait_event_type IS NOT NULL AS pg_waiting_on_lock
    FROM que_jobs
    JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.*
      FROM pg_locks
      JOIN pg_stat_activity USING (pid)
      WHERE locktype = 'advisory'
    ) pg USING (job_id)
  }.freeze,
}.freeze
Version =
'0.12.2'

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.adapterObject



70
71
72
# File 'lib/que.rb', line 70

def adapter
  @adapter || raise("Que connection not established!")
end

.disable_prepared_statementsObject



136
137
138
# File 'lib/que.rb', line 136

def disable_prepared_statements
  @disable_prepared_statements || false
end

.error_notifierObject

Returns the value of attribute error_notifier.



51
52
53
# File 'lib/que.rb', line 51

def error_notifier
  @error_notifier
end

.json_converterObject



185
186
187
# File 'lib/que.rb', line 185

def json_converter
  @json_converter ||= INDIFFERENTIATOR
end

.log_formatterObject



132
133
134
# File 'lib/que.rb', line 132

def log_formatter
  @log_formatter ||= JSON_MODULE.method(:dump)
end

.loggerObject



128
129
130
# File 'lib/que.rb', line 128

def logger
  @logger.respond_to?(:call) ? @logger.call : @logger
end

Class Method Details

.clear!Object



78
79
80
# File 'lib/que.rb', line 78

def clear!
  execute "DELETE FROM que_jobs"
end

.connection=(connection) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/que.rb', line 54

def connection=(connection)
  self.adapter =
    if connection.to_s == 'ActiveRecord'
      Adapters::ActiveRecord.new
    else
      case connection.class.to_s
      when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection)
      when 'ConnectionPool'             then Adapters::ConnectionPool.new(connection)
      when 'PG::Connection'             then Adapters::PG.new(connection)
      when 'Pond'                       then Adapters::Pond.new(connection)
      when 'NilClass'                   then connection
      else raise "Que connection not recognized: #{connection.inspect}"
      end
    end
end

.constantize(camel_cased_word) ⇒ Object



150
151
152
153
154
155
156
157
# File 'lib/que.rb', line 150

def constantize(camel_cased_word)
  if camel_cased_word.respond_to?(:constantize)
    # Use ActiveSupport's version if it exists.
    camel_cased_word.constantize
  else
    camel_cased_word.split('::').inject(Object, &:const_get)
  end
end

.create!Object

Have to support create! and drop! in old migrations. They just created and dropped the bare table.



111
112
113
# File 'lib/que.rb', line 111

def create!
  migrate! :version => 1
end

.db_versionObject



101
102
103
# File 'lib/que.rb', line 101

def db_version
  Migrations.db_version
end

.drop!Object



115
116
117
# File 'lib/que.rb', line 115

def drop!
  migrate! :version => 0
end

.enqueue(*args) ⇒ Object

Give us a cleaner interface when specifying a job_class as a string.



97
98
99
# File 'lib/que.rb', line 97

def enqueue(*args)
  Job.enqueue(*args)
end

.error_handlerObject



140
141
142
143
# File 'lib/que.rb', line 140

def error_handler
  warn "Que.error_handler has been renamed to Que.error_notifier, please update your code. This shim will be removed in Que version 1.0.0."
  error_notifier
end

.error_handler=(p) ⇒ Object



145
146
147
148
# File 'lib/que.rb', line 145

def error_handler=(p)
  warn "Que.error_handler= has been renamed to Que.error_notifier=, please update your code. This shim will be removed in Que version 1.0.0."
  self.error_notifier = p
end

.execute(*args) ⇒ Object



74
75
76
# File 'lib/que.rb', line 74

def execute(*args)
  adapter.execute(*args)
end

.job_statsObject



82
83
84
# File 'lib/que.rb', line 82

def job_stats
  execute :job_stats
end

.log(data) ⇒ Object



119
120
121
122
123
124
125
126
# File 'lib/que.rb', line 119

def log(data)
  level = data.delete(:level) || :info
  data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data)

  if (l = logger) && output = log_formatter.call(data)
    l.send level, output
  end
end

.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object



105
106
107
# File 'lib/que.rb', line 105

def migrate!(version = {:version => Migrations::CURRENT_VERSION})
  Migrations.migrate!(version)
end

.transactionObject

A helper method to manage transactions, used mainly by the migration system. It’s available for general use, but if you’re using an ORM that provides its own transaction helper, be sure to use that instead, or the two may interfere with one another.



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/que.rb', line 163

def transaction
  adapter.checkout do
    if adapter.in_transaction?
      yield
    else
      begin
        execute "BEGIN"
        yield
      rescue => error
        raise
      ensure
        # Handle a raised error or a killed thread.
        if error || Thread.current.status == 'aborting'
          execute "ROLLBACK"
        else
          execute "COMMIT"
        end
      end
    end
  end
end

.worker_statesObject



86
87
88
89
90
91
92
93
94
# File 'lib/que.rb', line 86

def worker_states
  adapter.checkout do |conn|
    if conn.server_version >= 90600
      execute :worker_states_96
    else
      execute :worker_states_95
    end
  end
end