Module: Que

Defined in:
lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb

Defined Under Namespace

Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker

Constant Summary collapse

SQL =
{
  # Thanks to RhodiumToad in #postgresql for help with the job lock CTE.
  :lock_job => %{
    WITH RECURSIVE job AS (
      SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
      FROM (
        SELECT j
        FROM que_jobs AS j
        WHERE queue = $1::text
        AND run_at <= now()
        ORDER BY priority, run_at, job_id
        LIMIT 1
      ) AS t1
      UNION ALL (
        SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
        FROM (
          SELECT (
            SELECT j
            FROM que_jobs AS j
            WHERE queue = $1::text
            AND run_at <= now()
            AND (priority, run_at, job_id) > (job.priority, job.run_at, job.job_id)
            ORDER BY priority, run_at, job_id
            LIMIT 1
          ) AS j
          FROM job
          WHERE NOT job.locked
          LIMIT 1
        ) AS t1
      )
    )
    SELECT queue, priority, run_at, job_id, job_class, args, error_count
    FROM job
    WHERE locked
    LIMIT 1
  }.freeze,

  :check_job => %{
    SELECT 1 AS one
    FROM   que_jobs
    WHERE  queue    = $1::text
    AND    priority = $2::smallint
    AND    run_at   = $3::timestamptz
    AND    job_id   = $4::bigint
  }.freeze,

  :set_error => %{
    UPDATE que_jobs
    SET error_count = $1::integer,
        run_at      = now() + $2::integer * '1 second'::interval,
        last_error  = $3::text
    WHERE queue     = $4::text
    AND   priority  = $5::smallint
    AND   run_at    = $6::timestamptz
    AND   job_id    = $7::bigint
  }.freeze,

  :insert_job => %{
    INSERT INTO que_jobs
    (queue, priority, run_at, job_class, args)
    VALUES
    (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, 'now')::timestamptz, $4::text, coalesce($5, '[]')::json)
    RETURNING *
  }.freeze,

  :destroy_job => %{
    DELETE FROM que_jobs
    WHERE queue    = $1::text
    AND   priority = $2::smallint
    AND   run_at   = $3::timestamptz
    AND   job_id   = $4::bigint
  }.freeze,

  :job_stats => %{
    SELECT queue,
           job_class,
           count(*)                    AS count,
           count(locks.job_id)         AS count_working,
           sum((error_count > 0)::int) AS count_errored,
           max(error_count)            AS highest_error_count,
           min(run_at)                 AS oldest_run_at
    FROM que_jobs
    LEFT JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id
      FROM pg_locks
      WHERE locktype = 'advisory'
    ) locks USING (job_id)
    GROUP BY queue, job_class
    ORDER BY count(*) DESC
  }.freeze,

  :worker_states => %{
    SELECT que_jobs.*,
           pg.pid          AS pg_backend_pid,
           pg.state        AS pg_state,
           pg.state_change AS pg_state_changed_at,
           pg.query        AS pg_last_query,
           pg.query_start  AS pg_last_query_started_at,
           pg.xact_start   AS pg_transaction_started_at,
           pg.waiting      AS pg_waiting_on_lock
    FROM que_jobs
    JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.*
      FROM pg_locks
      JOIN pg_stat_activity USING (pid)
      WHERE locktype = 'advisory'
    ) pg USING (job_id)
  }.freeze
}.freeze
Version =
'0.6.0'

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.adapterObject



37
38
39
# File 'lib/que.rb', line 37

def adapter
  @adapter || raise("Que connection not established!")
end

.error_handlerObject

Returns the value of attribute error_handler.



20
21
22
# File 'lib/que.rb', line 20

def error_handler
  @error_handler
end

.log_formatterObject



89
90
91
# File 'lib/que.rb', line 89

def log_formatter
  @log_formatter ||= JSON_MODULE.method(:dump)
end

.loggerObject

Returns the value of attribute logger.



20
21
22
# File 'lib/que.rb', line 20

def logger
  @logger
end

Class Method Details

.clear!Object



45
46
47
# File 'lib/que.rb', line 45

def clear!
  execute "DELETE FROM que_jobs"
end

.connection=(connection) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/que.rb', line 23

def connection=(connection)
  self.adapter = if connection.to_s == 'ActiveRecord'
    Adapters::ActiveRecord.new
  else
    case connection.class.to_s
      when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection)
      when 'ConnectionPool'             then Adapters::ConnectionPool.new(connection)
      when 'PG::Connection'             then Adapters::PG.new(connection)
      when 'NilClass'                   then connection
      else raise "Que connection not recognized: #{connection.inspect}"
    end
  end
end

.create!Object

Have to support create! and drop! in old migrations. They just created and dropped the bare table.



72
73
74
# File 'lib/que.rb', line 72

def create!
  migrate! :version => 1
end

.db_versionObject



62
63
64
# File 'lib/que.rb', line 62

def db_version
  Migrations.db_version
end

.drop!Object



76
77
78
# File 'lib/que.rb', line 76

def drop!
  migrate! :version => 0
end

.enqueue(*args) ⇒ Object

Give us a cleaner interface when specifying a job_class as a string.



58
59
60
# File 'lib/que.rb', line 58

def enqueue(*args)
  Job.enqueue(*args)
end

.execute(*args) ⇒ Object



41
42
43
# File 'lib/que.rb', line 41

def execute(*args)
  adapter.execute(*args)
end

.job_statsObject



49
50
51
# File 'lib/que.rb', line 49

def job_stats
  execute :job_stats
end

.log(data) ⇒ Object



80
81
82
83
84
85
86
87
# File 'lib/que.rb', line 80

def log(data)
  level = data.delete(:level) || :info
  data = {:lib => 'que', :hostname => Socket.gethostname, :thread => Thread.current.object_id}.merge(data)

  if logger && output = log_formatter.call(data)
    logger.send level, output
  end
end

.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object



66
67
68
# File 'lib/que.rb', line 66

def migrate!(version = {:version => Migrations::CURRENT_VERSION})
  Migrations.migrate!(version)
end

.worker_statesObject



53
54
55
# File 'lib/que.rb', line 53

def worker_states
  execute :worker_states
end