Module: Que
- Defined in:
- lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb
Defined Under Namespace
Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker
Constant Summary collapse
- SQL =
{ # Thanks to RhodiumToad in #postgresql for help with the job lock CTE. :lock_job => %{ WITH RECURSIVE job AS ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() ORDER BY priority, run_at, job_id LIMIT 1 ) AS t1 UNION ALL ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() AND (priority, run_at, job_id) > (job.priority, job.run_at, job.job_id) ORDER BY priority, run_at, job_id LIMIT 1 ) AS j FROM job WHERE NOT job.locked LIMIT 1 ) AS t1 ) ) SELECT queue, priority, run_at, job_id, job_class, args, error_count FROM job WHERE locked LIMIT 1 }.freeze, :check_job => %{ SELECT 1 AS one FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :set_error => %{ UPDATE que_jobs SET error_count = $1::integer, run_at = now() + $2::bigint * '1 second'::interval, last_error = $3::text WHERE queue = $4::text AND priority = $5::smallint AND run_at = $6::timestamptz AND job_id = $7::bigint }.freeze, :insert_job => %{ INSERT INTO que_jobs (queue, priority, run_at, job_class, args) VALUES (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json) RETURNING * }.freeze, :destroy_job => %{ DELETE FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :job_stats => %{ SELECT queue, job_class, count(*) AS count, count(locks.job_id) AS count_working, sum((error_count > 0)::int) AS count_errored, max(error_count) AS highest_error_count, min(run_at) AS oldest_run_at FROM que_jobs LEFT JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id FROM pg_locks WHERE locktype = 'advisory' ) locks USING (job_id) GROUP BY queue, job_class ORDER BY count(*) DESC }.freeze, :worker_states => %{ SELECT que_jobs.*, pg.pid AS pg_backend_pid, pg.state AS pg_state, pg.state_change AS pg_state_changed_at, pg.query AS pg_last_query, pg.query_start AS pg_last_query_started_at, pg.xact_start AS pg_transaction_started_at, pg.waiting AS pg_waiting_on_lock FROM que_jobs JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.* FROM pg_locks JOIN pg_stat_activity USING (pid) WHERE locktype = 'advisory' ) pg USING (job_id) }.freeze }.freeze
- Version =
'0.9.2'
Class Attribute Summary collapse
- .adapter ⇒ Object
-
.error_handler ⇒ Object
Returns the value of attribute error_handler.
- .log_formatter ⇒ Object
- .logger ⇒ Object
Class Method Summary collapse
- .clear! ⇒ Object
- .connection=(connection) ⇒ Object
-
.create! ⇒ Object
Have to support create! and drop! in old migrations.
- .db_version ⇒ Object
- .drop! ⇒ Object
-
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
- .execute(*args) ⇒ Object
- .job_stats ⇒ Object
- .log(data) ⇒ Object
- .migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
- .worker_states ⇒ Object
Class Attribute Details
.adapter ⇒ Object
39 40 41 |
# File 'lib/que.rb', line 39 def adapter @adapter || raise("Que connection not established!") end |
.error_handler ⇒ Object
Returns the value of attribute error_handler.
20 21 22 |
# File 'lib/que.rb', line 20 def error_handler @error_handler end |
.log_formatter ⇒ Object
95 96 97 |
# File 'lib/que.rb', line 95 def log_formatter @log_formatter ||= JSON_MODULE.method(:dump) end |
.logger ⇒ Object
91 92 93 |
# File 'lib/que.rb', line 91 def logger @logger.respond_to?(:call) ? @logger.call : @logger end |
Class Method Details
.clear! ⇒ Object
47 48 49 |
# File 'lib/que.rb', line 47 def clear! execute "DELETE FROM que_jobs" end |
.connection=(connection) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/que.rb', line 23 def connection=(connection) self.adapter = if connection.to_s == 'ActiveRecord' Adapters::ActiveRecord.new else case connection.class.to_s when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection) when 'ConnectionPool' then Adapters::ConnectionPool.new(connection) when 'PG::Connection' then Adapters::PG.new(connection) when 'Pond' then Adapters::Pond.new(connection) when 'NilClass' then connection else raise "Que connection not recognized: #{connection.inspect}" end end end |
.create! ⇒ Object
Have to support create! and drop! in old migrations. They just created and dropped the bare table.
74 75 76 |
# File 'lib/que.rb', line 74 def create! migrate! :version => 1 end |
.db_version ⇒ Object
64 65 66 |
# File 'lib/que.rb', line 64 def db_version Migrations.db_version end |
.drop! ⇒ Object
78 79 80 |
# File 'lib/que.rb', line 78 def drop! migrate! :version => 0 end |
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
60 61 62 |
# File 'lib/que.rb', line 60 def enqueue(*args) Job.enqueue(*args) end |
.execute(*args) ⇒ Object
43 44 45 |
# File 'lib/que.rb', line 43 def execute(*args) adapter.execute(*args) end |
.job_stats ⇒ Object
51 52 53 |
# File 'lib/que.rb', line 51 def job_stats execute :job_stats end |
.log(data) ⇒ Object
82 83 84 85 86 87 88 89 |
# File 'lib/que.rb', line 82 def log(data) level = data.delete(:level) || :info data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data) if (l = logger) && output = log_formatter.call(data) l.send level, output end end |
.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
68 69 70 |
# File 'lib/que.rb', line 68 def migrate!(version = {:version => Migrations::CURRENT_VERSION}) Migrations.migrate!(version) end |
.worker_states ⇒ Object
55 56 57 |
# File 'lib/que.rb', line 55 def worker_states execute :worker_states end |