Module: Que

Defined in:
lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb

Defined Under Namespace

Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker

Constant Summary collapse

HASH_DEFAULT_PROC =
proc { |hash, key| hash[key.to_s] if Symbol === key }
INDIFFERENTIATOR =
proc do |object|
  case object
  when Array
    object.each(&INDIFFERENTIATOR)
  when Hash
    object.default_proc = HASH_DEFAULT_PROC
    object.each { |key, value| object[key] = INDIFFERENTIATOR.call(value) }
    object
  else
    object
  end
end
SYMBOLIZER =
proc do |object|
  case object
  when Hash
    object.keys.each do |key|
      object[key.to_sym] = SYMBOLIZER.call(object.delete(key))
    end
    object
  when Array
    object.map! { |e| SYMBOLIZER.call(e) }
  else
    object
  end
end
SQL =
{
  # Locks a job using a Postgres recursive CTE [1].
  #
  # As noted by the Postgres documentation, it may be slightly easier to
  # think about this expression as iteration rather than recursion, despite
  # the `RECURSION` nomenclature defined by the SQL standards committee.
  # Recursion is used here so that jobs in the table can be iterated one-by-
  # one until a lock can be acquired, where a non-recursive `SELECT` would
  # have the undesirable side-effect of locking multiple jobs at once. i.e.
  # Consider that the following would have the worker lock *all* unlocked
  # jobs:
  #
  #   SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
  #   FROM que_jobs AS j;
  #
  # The CTE will initially produce an "anchor" from the non-recursive term
  # (i.e. before the `UNION`), and then use it as the contents of the
  # working table as it continues to iterate through `que_jobs` looking for
  # a lock. The jobs table has a sort on (priority, run_at, job_id) which
  # allows it to walk the jobs table in a stable manner. As noted above, the
  # recursion examines one job at a time so that it only ever acquires a
  # single lock.
  #
  # The recursion has two possible end conditions:
  #
  # 1. If a lock *can* be acquired, it bubbles up to the top-level `SELECT`
  #    outside of the `job` CTE which stops recursion because it is
  #    constrained with a `LIMIT` of 1.
  #
  # 2. If a lock *cannot* be acquired, the recursive term of the expression
  #    (i.e. what's after the `UNION`) will return an empty result set
  #    because there are no more candidates left that could possibly be
  #    locked. This empty result automatically ends recursion.
  #
  # Note that this query can be easily modified to lock any number of jobs
  # by tweaking the LIMIT clause in the main SELECT statement.
  #
  # [1] http://www.postgresql.org/docs/devel/static/queries-with.html
  #
  # Thanks to RhodiumToad in #postgresql for help with the original version
  # of the job lock CTE.
  :lock_job => %{
    WITH RECURSIVE jobs AS (
      SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
      FROM (
        SELECT j
        FROM que_jobs AS j
        WHERE queue = $1::text
        AND run_at <= now()
        ORDER BY priority, run_at, job_id
        LIMIT 1
      ) AS t1
      UNION ALL (
        SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
        FROM (
          SELECT (
            SELECT j
            FROM que_jobs AS j
            WHERE queue = $1::text
            AND run_at <= now()
            AND (priority, run_at, job_id) > (jobs.priority, jobs.run_at, jobs.job_id)
            ORDER BY priority, run_at, job_id
            LIMIT 1
          ) AS j
          FROM jobs
          WHERE jobs.job_id IS NOT NULL
          LIMIT 1
        ) AS t1
      )
    )
    SELECT queue, priority, run_at, job_id, job_class, args, error_count
    FROM jobs
    WHERE locked
    LIMIT 1
  }.freeze,

  :check_job => %{
    SELECT 1 AS one
    FROM   que_jobs
    WHERE  queue    = $1::text
    AND    priority = $2::smallint
    AND    run_at   = $3::timestamptz
    AND    job_id   = $4::bigint
  }.freeze,

  :set_error => %{
    UPDATE que_jobs
    SET error_count = $1::integer,
        run_at      = now() + $2::bigint * '1 second'::interval,
        last_error  = $3::text
    WHERE queue     = $4::text
    AND   priority  = $5::smallint
    AND   run_at    = $6::timestamptz
    AND   job_id    = $7::bigint
  }.freeze,

  :insert_job => %{
    INSERT INTO que_jobs
    (queue, priority, run_at, job_class, args)
    VALUES
    (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json)
    RETURNING *
  }.freeze,

  :destroy_job => %{
    DELETE FROM que_jobs
    WHERE queue    = $1::text
    AND   priority = $2::smallint
    AND   run_at   = $3::timestamptz
    AND   job_id   = $4::bigint
  }.freeze,

  :job_stats => %{
    SELECT queue,
           job_class,
           count(*)                    AS count,
           count(locks.job_id)         AS count_working,
           sum((error_count > 0)::int) AS count_errored,
           max(error_count)            AS highest_error_count,
           min(run_at)                 AS oldest_run_at
    FROM que_jobs
    LEFT JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id
      FROM pg_locks
      WHERE locktype = 'advisory'
    ) locks USING (job_id)
    GROUP BY queue, job_class
    ORDER BY count(*) DESC
  }.freeze,

  :worker_states => %{
    SELECT que_jobs.*,
           pg.pid          AS pg_backend_pid,
           pg.state        AS pg_state,
           pg.state_change AS pg_state_changed_at,
           pg.query        AS pg_last_query,
           pg.query_start  AS pg_last_query_started_at,
           pg.xact_start   AS pg_transaction_started_at,
           pg.waiting      AS pg_waiting_on_lock
    FROM que_jobs
    JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.*
      FROM pg_locks
      JOIN pg_stat_activity USING (pid)
      WHERE locktype = 'advisory'
    ) pg USING (job_id)
  }.freeze
}.freeze
Version =
'0.11.0'

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.adapterObject



68
69
70
# File 'lib/que.rb', line 68

def adapter
  @adapter || raise("Que connection not established!")
end

.disable_prepared_statementsObject



128
129
130
# File 'lib/que.rb', line 128

def disable_prepared_statements
  @disable_prepared_statements || false
end

.error_handlerObject

Returns the value of attribute error_handler.



49
50
51
# File 'lib/que.rb', line 49

def error_handler
  @error_handler
end

.json_converterObject



167
168
169
# File 'lib/que.rb', line 167

def json_converter
  @json_converter ||= INDIFFERENTIATOR
end

.log_formatterObject



124
125
126
# File 'lib/que.rb', line 124

def log_formatter
  @log_formatter ||= JSON_MODULE.method(:dump)
end

.loggerObject



120
121
122
# File 'lib/que.rb', line 120

def logger
  @logger.respond_to?(:call) ? @logger.call : @logger
end

Class Method Details

.clear!Object



76
77
78
# File 'lib/que.rb', line 76

def clear!
  execute "DELETE FROM que_jobs"
end

.connection=(connection) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/que.rb', line 52

def connection=(connection)
  self.adapter =
    if connection.to_s == 'ActiveRecord'
      Adapters::ActiveRecord.new
    else
      case connection.class.to_s
      when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection)
      when 'ConnectionPool'             then Adapters::ConnectionPool.new(connection)
      when 'PG::Connection'             then Adapters::PG.new(connection)
      when 'Pond'                       then Adapters::Pond.new(connection)
      when 'NilClass'                   then connection
      else raise "Que connection not recognized: #{connection.inspect}"
      end
    end
end

.constantize(camel_cased_word) ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/que.rb', line 132

def constantize(camel_cased_word)
  if camel_cased_word.respond_to?(:constantize)
    # Use ActiveSupport's version if it exists.
    camel_cased_word.constantize
  else
    string.split('::').inject(Object, &:const_get)
  end
end

.create!Object

Have to support create! and drop! in old migrations. They just created and dropped the bare table.



103
104
105
# File 'lib/que.rb', line 103

def create!
  migrate! :version => 1
end

.db_versionObject



93
94
95
# File 'lib/que.rb', line 93

def db_version
  Migrations.db_version
end

.drop!Object



107
108
109
# File 'lib/que.rb', line 107

def drop!
  migrate! :version => 0
end

.enqueue(*args) ⇒ Object

Give us a cleaner interface when specifying a job_class as a string.



89
90
91
# File 'lib/que.rb', line 89

def enqueue(*args)
  Job.enqueue(*args)
end

.execute(*args) ⇒ Object



72
73
74
# File 'lib/que.rb', line 72

def execute(*args)
  adapter.execute(*args)
end

.job_statsObject



80
81
82
# File 'lib/que.rb', line 80

def job_stats
  execute :job_stats
end

.log(data) ⇒ Object



111
112
113
114
115
116
117
118
# File 'lib/que.rb', line 111

def log(data)
  level = data.delete(:level) || :info
  data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data)

  if (l = logger) && output = log_formatter.call(data)
    l.send level, output
  end
end

.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object



97
98
99
# File 'lib/que.rb', line 97

def migrate!(version = {:version => Migrations::CURRENT_VERSION})
  Migrations.migrate!(version)
end

.transactionObject

A helper method to manage transactions, used mainly by the migration system. It’s available for general use, but if you’re using an ORM that provides its own transaction helper, be sure to use that instead, or the two may interfere with one another.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/que.rb', line 145

def transaction
  adapter.checkout do
    if adapter.in_transaction?
      yield
    else
      begin
        execute "BEGIN"
        yield
      rescue => error
        raise
      ensure
        # Handle a raised error or a killed thread.
        if error || Thread.current.status == 'aborting'
          execute "ROLLBACK"
        else
          execute "COMMIT"
        end
      end
    end
  end
end

.worker_statesObject



84
85
86
# File 'lib/que.rb', line 84

def worker_states
  execute :worker_states
end