Module: Que
- Defined in:
- lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb
Defined Under Namespace
Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker
Constant Summary collapse
- SQL =
{ # Thanks to RhodiumToad in #postgresql for help with the job lock CTE. :lock_job => %{ WITH RECURSIVE job AS ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() ORDER BY priority, run_at, job_id LIMIT 1 ) AS t1 UNION ALL ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() AND (priority, run_at, job_id) > (job.priority, job.run_at, job.job_id) ORDER BY priority, run_at, job_id LIMIT 1 ) AS j FROM job WHERE NOT job.locked LIMIT 1 ) AS t1 ) ) SELECT queue, priority, run_at, job_id, job_class, args, error_count FROM job WHERE locked LIMIT 1 }.freeze, :check_job => %{ SELECT 1 AS one FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :set_error => %{ UPDATE que_jobs SET error_count = $1::integer, run_at = now() + $2::bigint * '1 second'::interval, last_error = $3::text WHERE queue = $4::text AND priority = $5::smallint AND run_at = $6::timestamptz AND job_id = $7::bigint }.freeze, :insert_job => %{ INSERT INTO que_jobs (queue, priority, run_at, job_class, args) VALUES (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json) RETURNING * }.freeze, :destroy_job => %{ DELETE FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :job_stats => %{ SELECT queue, job_class, count(*) AS count, count(locks.job_id) AS count_working, sum((error_count > 0)::int) AS count_errored, max(error_count) AS highest_error_count, min(run_at) AS oldest_run_at FROM que_jobs LEFT JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id FROM pg_locks WHERE locktype = 'advisory' ) locks USING (job_id) GROUP BY queue, job_class ORDER BY count(*) DESC }.freeze, :worker_states => %{ SELECT que_jobs.*, pg.pid AS pg_backend_pid, pg.state AS pg_state, pg.state_change AS pg_state_changed_at, pg.query AS pg_last_query, pg.query_start AS pg_last_query_started_at, pg.xact_start AS pg_transaction_started_at, pg.waiting AS pg_waiting_on_lock FROM que_jobs JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.* FROM pg_locks JOIN pg_stat_activity USING (pid) WHERE locktype = 'advisory' ) pg USING (job_id) }.freeze }.freeze
- Version =
'0.10.0'
Class Attribute Summary collapse
- .adapter ⇒ Object
-
.error_handler ⇒ Object
Returns the value of attribute error_handler.
- .log_formatter ⇒ Object
- .logger ⇒ Object
Class Method Summary collapse
- .clear! ⇒ Object
- .connection=(connection) ⇒ Object
-
.create! ⇒ Object
Have to support create! and drop! in old migrations.
- .db_version ⇒ Object
- .drop! ⇒ Object
-
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
- .execute(*args) ⇒ Object
- .job_stats ⇒ Object
- .log(data) ⇒ Object
- .migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
-
.transaction ⇒ Object
A helper method to manage transactions, used mainly by the migration system.
- .worker_states ⇒ Object
Class Attribute Details
.adapter ⇒ Object
39 40 41 |
# File 'lib/que.rb', line 39 def adapter @adapter || raise("Que connection not established!") end |
.error_handler ⇒ Object
Returns the value of attribute error_handler.
20 21 22 |
# File 'lib/que.rb', line 20 def error_handler @error_handler end |
.log_formatter ⇒ Object
95 96 97 |
# File 'lib/que.rb', line 95 def log_formatter @log_formatter ||= JSON_MODULE.method(:dump) end |
.logger ⇒ Object
91 92 93 |
# File 'lib/que.rb', line 91 def logger @logger.respond_to?(:call) ? @logger.call : @logger end |
Class Method Details
.clear! ⇒ Object
47 48 49 |
# File 'lib/que.rb', line 47 def clear! execute "DELETE FROM que_jobs" end |
.connection=(connection) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/que.rb', line 23 def connection=(connection) self.adapter = if connection.to_s == 'ActiveRecord' Adapters::ActiveRecord.new else case connection.class.to_s when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection) when 'ConnectionPool' then Adapters::ConnectionPool.new(connection) when 'PG::Connection' then Adapters::PG.new(connection) when 'Pond' then Adapters::Pond.new(connection) when 'NilClass' then connection else raise "Que connection not recognized: #{connection.inspect}" end end end |
.create! ⇒ Object
Have to support create! and drop! in old migrations. They just created and dropped the bare table.
74 75 76 |
# File 'lib/que.rb', line 74 def create! migrate! :version => 1 end |
.db_version ⇒ Object
64 65 66 |
# File 'lib/que.rb', line 64 def db_version Migrations.db_version end |
.drop! ⇒ Object
78 79 80 |
# File 'lib/que.rb', line 78 def drop! migrate! :version => 0 end |
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
60 61 62 |
# File 'lib/que.rb', line 60 def enqueue(*args) Job.enqueue(*args) end |
.execute(*args) ⇒ Object
43 44 45 |
# File 'lib/que.rb', line 43 def execute(*args) adapter.execute(*args) end |
.job_stats ⇒ Object
51 52 53 |
# File 'lib/que.rb', line 51 def job_stats execute :job_stats end |
.log(data) ⇒ Object
82 83 84 85 86 87 88 89 |
# File 'lib/que.rb', line 82 def log(data) level = data.delete(:level) || :info data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data) if (l = logger) && output = log_formatter.call(data) l.send level, output end end |
.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
68 69 70 |
# File 'lib/que.rb', line 68 def migrate!(version = {:version => Migrations::CURRENT_VERSION}) Migrations.migrate!(version) end |
.transaction ⇒ Object
A helper method to manage transactions, used mainly by the migration system. It’s available for general use, but if you’re using an ORM that provides its own transaction helper, be sure to use that instead, or the two may interfere with one another.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/que.rb', line 103 def transaction adapter.checkout do if adapter.in_transaction? yield else begin execute "BEGIN" yield rescue => error raise ensure # Handle a raised error or a killed thread. if error || Thread.current.status == 'aborting' execute "ROLLBACK" else execute "COMMIT" end end end end end |
.worker_states ⇒ Object
55 56 57 |
# File 'lib/que.rb', line 55 def worker_states execute :worker_states end |