Module: Que
- Defined in:
- lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb
Defined Under Namespace
Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker
Constant Summary collapse
- HASH_DEFAULT_PROC =
proc { |hash, key| hash[key.to_s] if Symbol === key }
- INDIFFERENTIATOR =
proc do |object| case object when Array object.each(&INDIFFERENTIATOR) when Hash object.default_proc = HASH_DEFAULT_PROC object.each { |key, value| object[key] = INDIFFERENTIATOR.call(value) } object else object end end
- SYMBOLIZER =
proc do |object| case object when Hash object.keys.each do |key| object[key.to_sym] = SYMBOLIZER.call(object.delete(key)) end object when Array object.map! { |e| SYMBOLIZER.call(e) } else object end end
- SQL =
{ # Locks a job using a Postgres recursive CTE [1]. # # As noted by the Postgres documentation, it may be slightly easier to # think about this expression as iteration rather than recursion, despite # the `RECURSION` nomenclature defined by the SQL standards committee. # Recursion is used here so that jobs in the table can be iterated one-by- # one until a lock can be acquired, where a non-recursive `SELECT` would # have the undesirable side-effect of locking multiple jobs at once. i.e. # Consider that the following would have the worker lock *all* unlocked # jobs: # # SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked # FROM que_jobs AS j; # # The CTE will initially produce an "anchor" from the non-recursive term # (i.e. before the `UNION`), and then use it as the contents of the # working table as it continues to iterate through `que_jobs` looking for # a lock. The jobs table has a sort on (priority, run_at, job_id) which # allows it to walk the jobs table in a stable manner. As noted above, the # recursion examines one job at a time so that it only ever acquires a # single lock. # # The recursion has two possible end conditions: # # 1. If a lock *can* be acquired, it bubbles up to the top-level `SELECT` # outside of the `job` CTE which stops recursion because it is # constrained with a `LIMIT` of 1. # # 2. If a lock *cannot* be acquired, the recursive term of the expression # (i.e. what's after the `UNION`) will return an empty result set # because there are no more candidates left that could possibly be # locked. This empty result automatically ends recursion. # # Note that this query can be easily modified to lock any number of jobs # by tweaking the LIMIT clause in the main SELECT statement. # # [1] http://www.postgresql.org/docs/devel/static/queries-with.html # # Thanks to RhodiumToad in #postgresql for help with the original version # of the job lock CTE. :lock_job => %{ WITH RECURSIVE jobs AS ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() ORDER BY priority, run_at, job_id LIMIT 1 ) AS t1 UNION ALL ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() AND (priority, run_at, job_id) > (jobs.priority, jobs.run_at, jobs.job_id) ORDER BY priority, run_at, job_id LIMIT 1 ) AS j FROM jobs WHERE jobs.job_id IS NOT NULL LIMIT 1 ) AS t1 ) ) SELECT queue, priority, run_at, job_id, job_class, args, error_count FROM jobs WHERE locked LIMIT 1 }.freeze, :check_job => %{ SELECT 1 AS one FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :set_error => %{ UPDATE que_jobs SET error_count = error_count + 1, run_at = now() + $1::bigint * '1 second'::interval, last_error = $2::text WHERE queue = $3::text AND priority = $4::smallint AND run_at = $5::timestamptz AND job_id = $6::bigint }.freeze, :insert_job => %{ INSERT INTO que_jobs (queue, priority, run_at, job_class, args) VALUES (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json) RETURNING * }.freeze, :destroy_job => %{ DELETE FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :job_stats => %{ SELECT queue, job_class, count(*) AS count, count(locks.job_id) AS count_working, sum((error_count > 0)::int) AS count_errored, max(error_count) AS highest_error_count, min(run_at) AS oldest_run_at FROM que_jobs LEFT JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id FROM pg_locks WHERE locktype = 'advisory' ) locks USING (job_id) GROUP BY queue, job_class ORDER BY count(*) DESC }.freeze, :worker_states_95 => %{ SELECT que_jobs.*, pg.pid AS pg_backend_pid, pg.state AS pg_state, pg.state_change AS pg_state_changed_at, pg.query AS pg_last_query, pg.query_start AS pg_last_query_started_at, pg.xact_start AS pg_transaction_started_at, pg.waiting AS pg_waiting_on_lock FROM que_jobs JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.* FROM pg_locks JOIN pg_stat_activity USING (pid) WHERE locktype = 'advisory' ) pg USING (job_id) }.freeze, :worker_states_96 => %{ SELECT que_jobs.*, pg.pid AS pg_backend_pid, pg.state AS pg_state, pg.state_change AS pg_state_changed_at, pg.query AS pg_last_query, pg.query_start AS pg_last_query_started_at, pg.xact_start AS pg_transaction_started_at, pg.wait_event_type IS NOT NULL AS pg_waiting_on_lock FROM que_jobs JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.* FROM pg_locks JOIN pg_stat_activity USING (pid) WHERE locktype = 'advisory' ) pg USING (job_id) }.freeze, }.freeze
- Version =
'0.12.2'
Class Attribute Summary collapse
- .adapter ⇒ Object
- .disable_prepared_statements ⇒ Object
-
.error_notifier ⇒ Object
Returns the value of attribute error_notifier.
- .json_converter ⇒ Object
- .log_formatter ⇒ Object
- .logger ⇒ Object
Class Method Summary collapse
- .clear! ⇒ Object
- .connection=(connection) ⇒ Object
- .constantize(camel_cased_word) ⇒ Object
-
.create! ⇒ Object
Have to support create! and drop! in old migrations.
- .db_version ⇒ Object
- .drop! ⇒ Object
-
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
- .error_handler ⇒ Object
- .error_handler=(p) ⇒ Object
- .execute(*args) ⇒ Object
- .job_stats ⇒ Object
- .log(data) ⇒ Object
- .migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
-
.transaction ⇒ Object
A helper method to manage transactions, used mainly by the migration system.
- .worker_states ⇒ Object
Class Attribute Details
.adapter ⇒ Object
70 71 72 |
# File 'lib/que.rb', line 70 def adapter @adapter || raise("Que connection not established!") end |
.disable_prepared_statements ⇒ Object
136 137 138 |
# File 'lib/que.rb', line 136 def disable_prepared_statements @disable_prepared_statements || false end |
.error_notifier ⇒ Object
Returns the value of attribute error_notifier.
51 52 53 |
# File 'lib/que.rb', line 51 def error_notifier @error_notifier end |
.json_converter ⇒ Object
185 186 187 |
# File 'lib/que.rb', line 185 def json_converter @json_converter ||= INDIFFERENTIATOR end |
.log_formatter ⇒ Object
132 133 134 |
# File 'lib/que.rb', line 132 def log_formatter @log_formatter ||= JSON_MODULE.method(:dump) end |
.logger ⇒ Object
128 129 130 |
# File 'lib/que.rb', line 128 def logger @logger.respond_to?(:call) ? @logger.call : @logger end |
Class Method Details
.clear! ⇒ Object
78 79 80 |
# File 'lib/que.rb', line 78 def clear! execute "DELETE FROM que_jobs" end |
.connection=(connection) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/que.rb', line 54 def connection=(connection) self.adapter = if connection.to_s == 'ActiveRecord' Adapters::ActiveRecord.new else case connection.class.to_s when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection) when 'ConnectionPool' then Adapters::ConnectionPool.new(connection) when 'PG::Connection' then Adapters::PG.new(connection) when 'Pond' then Adapters::Pond.new(connection) when 'NilClass' then connection else raise "Que connection not recognized: #{connection.inspect}" end end end |
.constantize(camel_cased_word) ⇒ Object
150 151 152 153 154 155 156 157 |
# File 'lib/que.rb', line 150 def constantize(camel_cased_word) if camel_cased_word.respond_to?(:constantize) # Use ActiveSupport's version if it exists. camel_cased_word.constantize else camel_cased_word.split('::').inject(Object, &:const_get) end end |
.create! ⇒ Object
Have to support create! and drop! in old migrations. They just created and dropped the bare table.
111 112 113 |
# File 'lib/que.rb', line 111 def create! migrate! :version => 1 end |
.db_version ⇒ Object
101 102 103 |
# File 'lib/que.rb', line 101 def db_version Migrations.db_version end |
.drop! ⇒ Object
115 116 117 |
# File 'lib/que.rb', line 115 def drop! migrate! :version => 0 end |
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
97 98 99 |
# File 'lib/que.rb', line 97 def enqueue(*args) Job.enqueue(*args) end |
.error_handler ⇒ Object
140 141 142 143 |
# File 'lib/que.rb', line 140 def error_handler warn "Que.error_handler has been renamed to Que.error_notifier, please update your code. This shim will be removed in Que version 1.0.0." error_notifier end |
.error_handler=(p) ⇒ Object
145 146 147 148 |
# File 'lib/que.rb', line 145 def error_handler=(p) warn "Que.error_handler= has been renamed to Que.error_notifier=, please update your code. This shim will be removed in Que version 1.0.0." self.error_notifier = p end |
.execute(*args) ⇒ Object
74 75 76 |
# File 'lib/que.rb', line 74 def execute(*args) adapter.execute(*args) end |
.job_stats ⇒ Object
82 83 84 |
# File 'lib/que.rb', line 82 def job_stats execute :job_stats end |
.log(data) ⇒ Object
119 120 121 122 123 124 125 126 |
# File 'lib/que.rb', line 119 def log(data) level = data.delete(:level) || :info data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data) if (l = logger) && output = log_formatter.call(data) l.send level, output end end |
.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
105 106 107 |
# File 'lib/que.rb', line 105 def migrate!(version = {:version => Migrations::CURRENT_VERSION}) Migrations.migrate!(version) end |
.transaction ⇒ Object
A helper method to manage transactions, used mainly by the migration system. It’s available for general use, but if you’re using an ORM that provides its own transaction helper, be sure to use that instead, or the two may interfere with one another.
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/que.rb', line 163 def transaction adapter.checkout do if adapter.in_transaction? yield else begin execute "BEGIN" yield rescue => error raise ensure # Handle a raised error or a killed thread. if error || Thread.current.status == 'aborting' execute "ROLLBACK" else execute "COMMIT" end end end end end |
.worker_states ⇒ Object
86 87 88 89 90 91 92 93 94 |
# File 'lib/que.rb', line 86 def worker_states adapter.checkout do |conn| if conn.server_version >= 90600 execute :worker_states_96 else execute :worker_states_95 end end end |