Module: Que
- Defined in:
- lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb
Defined Under Namespace
Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker
Constant Summary collapse
- HASH_DEFAULT_PROC =
proc { |hash, key| hash[key.to_s] if Symbol === key }
- INDIFFERENTIATOR =
proc do |object| case object when Array object.each(&INDIFFERENTIATOR) when Hash object.default_proc = HASH_DEFAULT_PROC object.each { |key, value| object[key] = INDIFFERENTIATOR.call(value) } object else object end end
- SYMBOLIZER =
proc do |object| case object when Hash object.keys.each do |key| object[key.to_sym] = SYMBOLIZER.call(object.delete(key)) end object when Array object.map! { |e| SYMBOLIZER.call(e) } else object end end
- SQL =
{ # Locks a job using a Postgres recursive CTE [1]. # # As noted by the Postgres documentation, it may be slightly easier to # think about this expression as iteration rather than recursion, despite # the `RECURSION` nomenclature defined by the SQL standards committee. # Recursion is used here so that jobs in the table can be iterated one-by- # one until a lock can be acquired, where a non-recursive `SELECT` would # have the undesirable side-effect of locking multiple jobs at once. i.e. # Consider that the following would have the worker lock *all* unlocked # jobs: # # SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked # FROM que_jobs AS j; # # The CTE will initially produce an "anchor" from the non-recursive term # (i.e. before the `UNION`), and then use it as the contents of the # working table as it continues to iterate through `que_jobs` looking for # a lock. The jobs table has a sort on (priority, run_at, job_id) which # allows it to walk the jobs table in a stable manner. As noted above, the # recursion examines one job at a time so that it only ever acquires a # single lock. # # The recursion has two possible end conditions: # # 1. If a lock *can* be acquired, it bubbles up to the top-level `SELECT` # outside of the `job` CTE which stops recursion because it is # constrained with a `LIMIT` of 1. # # 2. If a lock *cannot* be acquired, the recursive term of the expression # (i.e. what's after the `UNION`) will return an empty result set # because there are no more candidates left that could possibly be # locked. This empty result automatically ends recursion. # # Note that this query can be easily modified to lock any number of jobs # by tweaking the LIMIT clause in the main SELECT statement. # # [1] http://www.postgresql.org/docs/devel/static/queries-with.html # # Thanks to RhodiumToad in #postgresql for help with the original version # of the job lock CTE. :lock_job => %{ WITH RECURSIVE jobs AS ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() ORDER BY priority, run_at, job_id LIMIT 1 ) AS t1 UNION ALL ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() AND (priority, run_at, job_id) > (jobs.priority, jobs.run_at, jobs.job_id) ORDER BY priority, run_at, job_id LIMIT 1 ) AS j FROM jobs WHERE jobs.job_id IS NOT NULL LIMIT 1 ) AS t1 ) ) SELECT queue, priority, run_at, job_id, job_class, args, error_count FROM jobs WHERE locked LIMIT 1 }.freeze, :check_job => %{ SELECT 1 AS one FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :set_error => %{ UPDATE que_jobs SET error_count = error_count + 1, run_at = now() + $1::bigint * '1 second'::interval, last_error = $2::text WHERE queue = $3::text AND priority = $4::smallint AND run_at = $5::timestamptz AND job_id = $6::bigint }.freeze, :insert_job => %{ INSERT INTO que_jobs (queue, priority, run_at, job_class, args) VALUES (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json) RETURNING * }.freeze, :destroy_job => %{ DELETE FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :job_stats => %{ SELECT queue, job_class, count(*) AS count, count(locks.job_id) AS count_working, sum((error_count > 0)::int) AS count_errored, max(error_count) AS highest_error_count, min(run_at) AS oldest_run_at FROM que_jobs LEFT JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id FROM pg_locks WHERE locktype = 'advisory' ) locks USING (job_id) GROUP BY queue, job_class ORDER BY count(*) DESC }.freeze, :worker_states => %{ SELECT que_jobs.*, pg.pid AS pg_backend_pid, pg.state AS pg_state, pg.state_change AS pg_state_changed_at, pg.query AS pg_last_query, pg.query_start AS pg_last_query_started_at, pg.xact_start AS pg_transaction_started_at, pg.waiting AS pg_waiting_on_lock FROM que_jobs JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.* FROM pg_locks JOIN pg_stat_activity USING (pid) WHERE locktype = 'advisory' ) pg USING (job_id) }.freeze }.freeze
- Version =
'0.12.0'
Class Attribute Summary collapse
- .adapter ⇒ Object
- .disable_prepared_statements ⇒ Object
-
.error_notifier ⇒ Object
Returns the value of attribute error_notifier.
- .json_converter ⇒ Object
- .log_formatter ⇒ Object
- .logger ⇒ Object
Class Method Summary collapse
- .clear! ⇒ Object
- .connection=(connection) ⇒ Object
- .constantize(camel_cased_word) ⇒ Object
-
.create! ⇒ Object
Have to support create! and drop! in old migrations.
- .db_version ⇒ Object
- .drop! ⇒ Object
-
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
- .error_handler ⇒ Object
- .error_handler=(p) ⇒ Object
- .execute(*args) ⇒ Object
- .job_stats ⇒ Object
- .log(data) ⇒ Object
- .migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
-
.transaction ⇒ Object
A helper method to manage transactions, used mainly by the migration system.
- .worker_states ⇒ Object
Class Attribute Details
.adapter ⇒ Object
70 71 72 |
# File 'lib/que.rb', line 70 def adapter @adapter || raise("Que connection not established!") end |
.disable_prepared_statements ⇒ Object
130 131 132 |
# File 'lib/que.rb', line 130 def disable_prepared_statements @disable_prepared_statements || false end |
.error_notifier ⇒ Object
Returns the value of attribute error_notifier.
51 52 53 |
# File 'lib/que.rb', line 51 def error_notifier @error_notifier end |
.json_converter ⇒ Object
179 180 181 |
# File 'lib/que.rb', line 179 def json_converter @json_converter ||= INDIFFERENTIATOR end |
.log_formatter ⇒ Object
126 127 128 |
# File 'lib/que.rb', line 126 def log_formatter @log_formatter ||= JSON_MODULE.method(:dump) end |
.logger ⇒ Object
122 123 124 |
# File 'lib/que.rb', line 122 def logger @logger.respond_to?(:call) ? @logger.call : @logger end |
Class Method Details
.clear! ⇒ Object
78 79 80 |
# File 'lib/que.rb', line 78 def clear! execute "DELETE FROM que_jobs" end |
.connection=(connection) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/que.rb', line 54 def connection=(connection) self.adapter = if connection.to_s == 'ActiveRecord' Adapters::ActiveRecord.new else case connection.class.to_s when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection) when 'ConnectionPool' then Adapters::ConnectionPool.new(connection) when 'PG::Connection' then Adapters::PG.new(connection) when 'Pond' then Adapters::Pond.new(connection) when 'NilClass' then connection else raise "Que connection not recognized: #{connection.inspect}" end end end |
.constantize(camel_cased_word) ⇒ Object
144 145 146 147 148 149 150 151 |
# File 'lib/que.rb', line 144 def constantize(camel_cased_word) if camel_cased_word.respond_to?(:constantize) # Use ActiveSupport's version if it exists. camel_cased_word.constantize else camel_cased_word.split('::').inject(Object, &:const_get) end end |
.create! ⇒ Object
Have to support create! and drop! in old migrations. They just created and dropped the bare table.
105 106 107 |
# File 'lib/que.rb', line 105 def create! migrate! :version => 1 end |
.db_version ⇒ Object
95 96 97 |
# File 'lib/que.rb', line 95 def db_version Migrations.db_version end |
.drop! ⇒ Object
109 110 111 |
# File 'lib/que.rb', line 109 def drop! migrate! :version => 0 end |
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
91 92 93 |
# File 'lib/que.rb', line 91 def enqueue(*args) Job.enqueue(*args) end |
.error_handler ⇒ Object
134 135 136 137 |
# File 'lib/que.rb', line 134 def error_handler warn "Que.error_handler has been renamed to Que.error_notifier, please update your code. This shim will be removed in Que version 1.0.0." error_notifier end |
.error_handler=(p) ⇒ Object
139 140 141 142 |
# File 'lib/que.rb', line 139 def error_handler=(p) warn "Que.error_handler= has been renamed to Que.error_notifier=, please update your code. This shim will be removed in Que version 1.0.0." self.error_notifier = p end |
.execute(*args) ⇒ Object
74 75 76 |
# File 'lib/que.rb', line 74 def execute(*args) adapter.execute(*args) end |
.job_stats ⇒ Object
82 83 84 |
# File 'lib/que.rb', line 82 def job_stats execute :job_stats end |
.log(data) ⇒ Object
113 114 115 116 117 118 119 120 |
# File 'lib/que.rb', line 113 def log(data) level = data.delete(:level) || :info data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data) if (l = logger) && output = log_formatter.call(data) l.send level, output end end |
.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
99 100 101 |
# File 'lib/que.rb', line 99 def migrate!(version = {:version => Migrations::CURRENT_VERSION}) Migrations.migrate!(version) end |
.transaction ⇒ Object
A helper method to manage transactions, used mainly by the migration system. It’s available for general use, but if you’re using an ORM that provides its own transaction helper, be sure to use that instead, or the two may interfere with one another.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/que.rb', line 157 def transaction adapter.checkout do if adapter.in_transaction? yield else begin execute "BEGIN" yield rescue => error raise ensure # Handle a raised error or a killed thread. if error || Thread.current.status == 'aborting' execute "ROLLBACK" else execute "COMMIT" end end end end end |
.worker_states ⇒ Object
86 87 88 |
# File 'lib/que.rb', line 86 def worker_states execute :worker_states end |