Module: Que

Defined in:
lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb

Defined Under Namespace

Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker

Constant Summary collapse

SQL =
{
  # Thanks to RhodiumToad in #postgresql for help with the job lock CTE.
  :lock_job => %{
    WITH RECURSIVE job AS (
      SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
      FROM (
        SELECT j
        FROM que_jobs AS j
        WHERE queue = $1::text
        AND run_at <= now()
        ORDER BY priority, run_at, job_id
        LIMIT 1
      ) AS t1
      UNION ALL (
        SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked
        FROM (
          SELECT (
            SELECT j
            FROM que_jobs AS j
            WHERE queue = $1::text
            AND run_at <= now()
            AND (priority, run_at, job_id) > (job.priority, job.run_at, job.job_id)
            ORDER BY priority, run_at, job_id
            LIMIT 1
          ) AS j
          FROM job
          WHERE NOT job.locked
          LIMIT 1
        ) AS t1
      )
    )
    SELECT queue, priority, run_at, job_id, job_class, args, error_count
    FROM job
    WHERE locked
    LIMIT 1
  }.freeze,

  :check_job => %{
    SELECT 1 AS one
    FROM   que_jobs
    WHERE  queue    = $1::text
    AND    priority = $2::smallint
    AND    run_at   = $3::timestamptz
    AND    job_id   = $4::bigint
  }.freeze,

  :set_error => %{
    UPDATE que_jobs
    SET error_count = $1::integer,
        run_at      = now() + $2::bigint * '1 second'::interval,
        last_error  = $3::text
    WHERE queue     = $4::text
    AND   priority  = $5::smallint
    AND   run_at    = $6::timestamptz
    AND   job_id    = $7::bigint
  }.freeze,

  :insert_job => %{
    INSERT INTO que_jobs
    (queue, priority, run_at, job_class, args)
    VALUES
    (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json)
    RETURNING *
  }.freeze,

  :destroy_job => %{
    DELETE FROM que_jobs
    WHERE queue    = $1::text
    AND   priority = $2::smallint
    AND   run_at   = $3::timestamptz
    AND   job_id   = $4::bigint
  }.freeze,

  :job_stats => %{
    SELECT queue,
           job_class,
           count(*)                    AS count,
           count(locks.job_id)         AS count_working,
           sum((error_count > 0)::int) AS count_errored,
           max(error_count)            AS highest_error_count,
           min(run_at)                 AS oldest_run_at
    FROM que_jobs
    LEFT JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id
      FROM pg_locks
      WHERE locktype = 'advisory'
    ) locks USING (job_id)
    GROUP BY queue, job_class
    ORDER BY count(*) DESC
  }.freeze,

  :worker_states => %{
    SELECT que_jobs.*,
           pg.pid          AS pg_backend_pid,
           pg.state        AS pg_state,
           pg.state_change AS pg_state_changed_at,
           pg.query        AS pg_last_query,
           pg.query_start  AS pg_last_query_started_at,
           pg.xact_start   AS pg_transaction_started_at,
           pg.waiting      AS pg_waiting_on_lock
    FROM que_jobs
    JOIN (
      SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.*
      FROM pg_locks
      JOIN pg_stat_activity USING (pid)
      WHERE locktype = 'advisory'
    ) pg USING (job_id)
  }.freeze
}.freeze
Version =
'0.10.0'

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.adapterObject



39
40
41
# File 'lib/que.rb', line 39

def adapter
  @adapter || raise("Que connection not established!")
end

.error_handlerObject

Returns the value of attribute error_handler.



20
21
22
# File 'lib/que.rb', line 20

def error_handler
  @error_handler
end

.log_formatterObject



95
96
97
# File 'lib/que.rb', line 95

def log_formatter
  @log_formatter ||= JSON_MODULE.method(:dump)
end

.loggerObject



91
92
93
# File 'lib/que.rb', line 91

def logger
  @logger.respond_to?(:call) ? @logger.call : @logger
end

Class Method Details

.clear!Object



47
48
49
# File 'lib/que.rb', line 47

def clear!
  execute "DELETE FROM que_jobs"
end

.connection=(connection) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/que.rb', line 23

def connection=(connection)
  self.adapter =
    if connection.to_s == 'ActiveRecord'
      Adapters::ActiveRecord.new
    else
      case connection.class.to_s
      when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection)
      when 'ConnectionPool'             then Adapters::ConnectionPool.new(connection)
      when 'PG::Connection'             then Adapters::PG.new(connection)
      when 'Pond'                       then Adapters::Pond.new(connection)
      when 'NilClass'                   then connection
      else raise "Que connection not recognized: #{connection.inspect}"
      end
    end
end

.create!Object

Have to support create! and drop! in old migrations. They just created and dropped the bare table.



74
75
76
# File 'lib/que.rb', line 74

def create!
  migrate! :version => 1
end

.db_versionObject



64
65
66
# File 'lib/que.rb', line 64

def db_version
  Migrations.db_version
end

.drop!Object



78
79
80
# File 'lib/que.rb', line 78

def drop!
  migrate! :version => 0
end

.enqueue(*args) ⇒ Object

Give us a cleaner interface when specifying a job_class as a string.



60
61
62
# File 'lib/que.rb', line 60

def enqueue(*args)
  Job.enqueue(*args)
end

.execute(*args) ⇒ Object



43
44
45
# File 'lib/que.rb', line 43

def execute(*args)
  adapter.execute(*args)
end

.job_statsObject



51
52
53
# File 'lib/que.rb', line 51

def job_stats
  execute :job_stats
end

.log(data) ⇒ Object



82
83
84
85
86
87
88
89
# File 'lib/que.rb', line 82

def log(data)
  level = data.delete(:level) || :info
  data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data)

  if (l = logger) && output = log_formatter.call(data)
    l.send level, output
  end
end

.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object



68
69
70
# File 'lib/que.rb', line 68

def migrate!(version = {:version => Migrations::CURRENT_VERSION})
  Migrations.migrate!(version)
end

.transactionObject

A helper method to manage transactions, used mainly by the migration system. It’s available for general use, but if you’re using an ORM that provides its own transaction helper, be sure to use that instead, or the two may interfere with one another.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/que.rb', line 103

def transaction
  adapter.checkout do
    if adapter.in_transaction?
      yield
    else
      begin
        execute "BEGIN"
        yield
      rescue => error
        raise
      ensure
        # Handle a raised error or a killed thread.
        if error || Thread.current.status == 'aborting'
          execute "ROLLBACK"
        else
          execute "COMMIT"
        end
      end
    end
  end
end

.worker_statesObject



55
56
57
# File 'lib/que.rb', line 55

def worker_states
  execute :worker_states
end