Module: Que

Defined in:
lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb

Defined Under Namespace

Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker

Constant Summary collapse

SQL =
{
  # Thanks to RhodiumToad in #postgresql for help with the job lock CTE.
  :lock_job => %{
    WITH RECURSIVE job AS (
      SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
      FROM (
        SELECT j
        FROM que_jobs AS j
        WHERE queue = $1::text
        AND run_at <= now()
        ORDER BY priority, run_at, job_id
        LIMIT 1
      ) AS t1
      UNION ALL (
        SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
        FROM (
          SELECT (
            SELECT j
            FROM que_jobs AS j
            WHERE queue = $1::text
            AND run_at <= now()
            AND (priority, run_at, job_id) > (job.priority, job.run_at, job.job_id)
            ORDER BY priority, run_at, job_id
            LIMIT 1
          ) AS j
          FROM job
          WHERE NOT job.locked
          LIMIT 1
        ) AS t1
      )
    )
    SELECT queue, priority, run_at, job_id, job_class, args, error_count
    FROM job
    WHERE locked
    LIMIT 1
  }.freeze,

  :check_job => %{
    SELECT 1 AS one
    FROM   que_jobs
    WHERE  queue    = $1::text
    AND    priority = $2::smallint
    AND    run_at   = $3::timestamptz
    AND    job_id   = $4::bigint
  }.freeze,

  :set_error => %{
    UPDATE que_jobs
    SET error_count = $1::integer,
        run_at      = now() + $2::bigint * '1 second'::interval,
        last_error  = $3::text
    WHERE queue     = $4::text
    AND   priority  = $5::smallint
    AND   run_at    = $6::timestamptz
    AND   job_id    = $7::bigint
  }.freeze,

  :insert_job => %{
    INSERT INTO que_jobs
    (queue, priority, run_at, job_class, args)
    VALUES
    (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json)
    RETURNING *
  }.freeze,

  :destroy_job => %{
    DELETE FROM que_jobs
    WHERE queue    = $1::text
    AND   priority = $2::smallint
    AND   run_at   = $3::timestamptz
    AND   job_id   = $4::bigint
  }.freeze,

  :job_stats => %{
    SELECT queue,
           job_class,
           count(*)                    AS count,
           count(locks.job_id)         AS count_working,
           sum((error_count > 0)::int) AS count_errored,
           max(error_count)            AS highest_error_count,
           min(run_at)                 AS oldest_run_at
    FROM que_jobs
    LEFT JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id
      FROM pg_locks
      WHERE locktype = 'advisory'
    ) locks USING (job_id)
    GROUP BY queue, job_class
    ORDER BY count(*) DESC
  }.freeze,

  :worker_states => %{
    SELECT que_jobs.*,
           pg.pid          AS pg_backend_pid,
           pg.state        AS pg_state,
           pg.state_change AS pg_state_changed_at,
           pg.query        AS pg_last_query,
           pg.query_start  AS pg_last_query_started_at,
           pg.xact_start   AS pg_transaction_started_at,
           pg.waiting      AS pg_waiting_on_lock
    FROM que_jobs
    JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.*
      FROM pg_locks
      JOIN pg_stat_activity USING (pid)
      WHERE locktype = 'advisory'
    ) pg USING (job_id)
  }.freeze
}.freeze
Version =
'0.9.2'

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.adapterObject



39
40
41
# File 'lib/que.rb', line 39

def adapter
  @adapter || raise("Que connection not established!")
end

.error_handlerObject

Returns the value of attribute error_handler.



20
21
22
# File 'lib/que.rb', line 20

def error_handler
  @error_handler
end

.log_formatterObject



95
96
97
# File 'lib/que.rb', line 95

def log_formatter
  @log_formatter ||= JSON_MODULE.method(:dump)
end

.loggerObject



91
92
93
# File 'lib/que.rb', line 91

def logger
  @logger.respond_to?(:call) ? @logger.call : @logger
end

Class Method Details

.clear!Object



47
48
49
# File 'lib/que.rb', line 47

def clear!
  execute "DELETE FROM que_jobs"
end

.connection=(connection) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/que.rb', line 23

def connection=(connection)
  self.adapter =
    if connection.to_s == 'ActiveRecord'
      Adapters::ActiveRecord.new
    else
      case connection.class.to_s
      when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection)
      when 'ConnectionPool'             then Adapters::ConnectionPool.new(connection)
      when 'PG::Connection'             then Adapters::PG.new(connection)
      when 'Pond'                       then Adapters::Pond.new(connection)
      when 'NilClass'                   then connection
      else raise "Que connection not recognized: #{connection.inspect}"
      end
    end
end

.create!Object

Have to support create! and drop! in old migrations. They just created and dropped the bare table.



74
75
76
# File 'lib/que.rb', line 74

def create!
  migrate! :version => 1
end

.db_versionObject



64
65
66
# File 'lib/que.rb', line 64

def db_version
  Migrations.db_version
end

.drop!Object



78
79
80
# File 'lib/que.rb', line 78

def drop!
  migrate! :version => 0
end

.enqueue(*args) ⇒ Object

Give us a cleaner interface when specifying a job_class as a string.



60
61
62
# File 'lib/que.rb', line 60

def enqueue(*args)
  Job.enqueue(*args)
end

.execute(*args) ⇒ Object



43
44
45
# File 'lib/que.rb', line 43

def execute(*args)
  adapter.execute(*args)
end

.job_statsObject



51
52
53
# File 'lib/que.rb', line 51

def job_stats
  execute :job_stats
end

.log(data) ⇒ Object



82
83
84
85
86
87
88
89
# File 'lib/que.rb', line 82

def log(data)
  level = data.delete(:level) || :info
  data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data)

  if (l = logger) && output = log_formatter.call(data)
    l.send level, output
  end
end

.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object



68
69
70
# File 'lib/que.rb', line 68

def migrate!(version = {:version => Migrations::CURRENT_VERSION})
  Migrations.migrate!(version)
end

.worker_statesObject



55
56
57
# File 'lib/que.rb', line 55

def worker_states
  execute :worker_states
end