Module: Que
- Defined in:
- lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb
Defined Under Namespace
Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker
Constant Summary collapse
- HASH_DEFAULT_PROC =
proc { |hash, key| hash[key.to_s] if Symbol === key }
- INDIFFERENTIATOR =
proc do |object| case object when Array object.each(&INDIFFERENTIATOR) when Hash object.default_proc = HASH_DEFAULT_PROC object.each { |key, value| object[key] = INDIFFERENTIATOR.call(value) } object else object end end
- SYMBOLIZER =
proc do |object| case object when Hash object.keys.each do |key| object[key.to_sym] = SYMBOLIZER.call(object.delete(key)) end object when Array object.map! { |e| SYMBOLIZER.call(e) } else object end end
- SQL =
{ # Locks a job using a Postgres recursive CTE [1]. # # As noted by the Postgres documentation, it may be slightly easier to # think about this expression as iteration rather than recursion, despite # the `RECURSION` nomenclature defined by the SQL standards committee. # Recursion is used here so that jobs in the table can be iterated one-by- # one until a lock can be acquired, where a non-recursive `SELECT` would # have the undesirable side-effect of locking multiple jobs at once. i.e. # Consider that the following would have the worker lock *all* unlocked # jobs: # # SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked # FROM que_jobs AS j; # # The CTE will initially produce an "anchor" from the non-recursive term # (i.e. before the `UNION`), and then use it as the contents of the # working table as it continues to iterate through `que_jobs` looking for # a lock. The jobs table has a sort on (priority, run_at, job_id) which # allows it to walk the jobs table in a stable manner. As noted above, the # recursion examines one job at a time so that it only ever acquires a # single lock. # # The recursion has two possible end conditions: # # 1. If a lock *can* be acquired, it bubbles up to the top-level `SELECT` # outside of the `job` CTE which stops recursion because it is # constrained with a `LIMIT` of 1. # # 2. If a lock *cannot* be acquired, the recursive term of the expression # (i.e. what's after the `UNION`) will return an empty result set # because there are no more candidates left that could possibly be # locked. This empty result automatically ends recursion. # # Note that this query can be easily modified to lock any number of jobs # by tweaking the LIMIT clause in the main SELECT statement. # # [1] http://www.postgresql.org/docs/devel/static/queries-with.html # # Thanks to RhodiumToad in #postgresql for help with the original version # of the job lock CTE. :lock_job => %{ WITH RECURSIVE jobs AS ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() ORDER BY priority, run_at, job_id LIMIT 1 ) AS t1 UNION ALL ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() AND (priority, run_at, job_id) > (jobs.priority, jobs.run_at, jobs.job_id) ORDER BY priority, run_at, job_id LIMIT 1 ) AS j FROM jobs WHERE jobs.job_id IS NOT NULL LIMIT 1 ) AS t1 ) ) SELECT queue, priority, run_at, job_id, job_class, args, error_count FROM jobs WHERE locked LIMIT 1 }.freeze, :check_job => %{ SELECT 1 AS one FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :set_error => %{ UPDATE que_jobs SET error_count = error_count + 1, run_at = now() + $1::bigint * '1 second'::interval, last_error = $2::text WHERE queue = $3::text AND priority = $4::smallint AND run_at = $5::timestamptz AND job_id = $6::bigint }.freeze, :insert_job => %{ INSERT INTO que_jobs (queue, priority, run_at, job_class, args) VALUES (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json) RETURNING * }.freeze, :destroy_job => %{ DELETE FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :job_stats => %{ SELECT queue, job_class, count(*) AS count, count(locks.job_id) AS count_working, sum((error_count > 0)::int) AS count_errored, max(error_count) AS highest_error_count, min(run_at) AS oldest_run_at FROM que_jobs LEFT JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id FROM pg_locks WHERE locktype = 'advisory' ) locks USING (job_id) GROUP BY queue, job_class ORDER BY count(*) DESC }.freeze, :worker_states_95 => %{ SELECT que_jobs.*, pg.pid AS pg_backend_pid, pg.state AS pg_state, pg.state_change AS pg_state_changed_at, pg.query AS pg_last_query, pg.query_start AS pg_last_query_started_at, pg.xact_start AS pg_transaction_started_at, pg.waiting AS pg_waiting_on_lock FROM que_jobs JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.* FROM pg_locks JOIN pg_stat_activity USING (pid) WHERE locktype = 'advisory' ) pg USING (job_id) }.freeze, :worker_states_96 => %{ SELECT que_jobs.*, pg.pid AS pg_backend_pid, pg.state AS pg_state, pg.state_change AS pg_state_changed_at, pg.query AS pg_last_query, pg.query_start AS pg_last_query_started_at, pg.xact_start AS pg_transaction_started_at, pg.wait_event_type IS NOT NULL AS pg_waiting_on_lock FROM que_jobs JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.* FROM pg_locks JOIN pg_stat_activity USING (pid) WHERE locktype = 'advisory' ) pg USING (job_id) }.freeze, }.freeze
- Version =
'0.14.3'
Class Attribute Summary collapse
- .adapter ⇒ Object
-
.error_notifier ⇒ Object
Returns the value of attribute error_notifier.
- .json_converter ⇒ Object
- .log_formatter ⇒ Object
- .logger ⇒ Object
- .use_prepared_statements ⇒ Object
Class Method Summary collapse
- .clear! ⇒ Object
- .connection=(connection) ⇒ Object
- .constantize(camel_cased_word) ⇒ Object
-
.create! ⇒ Object
Have to support create! and drop! in old migrations.
- .db_version ⇒ Object
- .disable_prepared_statements ⇒ Object
- .disable_prepared_statements=(setting) ⇒ Object
- .drop! ⇒ Object
-
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
- .error_handler ⇒ Object
- .error_handler=(p) ⇒ Object
- .execute(*args) ⇒ Object
- .job_stats ⇒ Object
- .log(data) ⇒ Object
- .migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
-
.transaction ⇒ Object
A helper method to manage transactions, used mainly by the migration system.
- .worker_states ⇒ Object
Class Attribute Details
.adapter ⇒ Object
63 64 65 |
# File 'lib/que.rb', line 63 def adapter @adapter || raise("Que connection not established!") end |
.error_notifier ⇒ Object
Returns the value of attribute error_notifier.
44 45 46 |
# File 'lib/que.rb', line 44 def error_notifier @error_notifier end |
.json_converter ⇒ Object
189 190 191 |
# File 'lib/que.rb', line 189 def json_converter @json_converter ||= INDIFFERENTIATOR end |
.log_formatter ⇒ Object
125 126 127 |
# File 'lib/que.rb', line 125 def log_formatter @log_formatter ||= JSON.method(:dump) end |
.logger ⇒ Object
121 122 123 |
# File 'lib/que.rb', line 121 def logger @logger.respond_to?(:call) ? @logger.call : @logger end |
.use_prepared_statements ⇒ Object
129 130 131 132 |
# File 'lib/que.rb', line 129 def use_prepared_statements setting = @use_prepared_statements setting.nil? ? true : setting end |
Class Method Details
.clear! ⇒ Object
71 72 73 |
# File 'lib/que.rb', line 71 def clear! execute "DELETE FROM que_jobs" end |
.connection=(connection) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/que.rb', line 47 def connection=(connection) self.adapter = if connection.to_s == 'ActiveRecord' Adapters::ActiveRecord.new else case connection.class.to_s when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection) when 'ConnectionPool' then Adapters::ConnectionPool.new(connection) when 'PG::Connection' then Adapters::PG.new(connection) when 'Pond' then Adapters::Pond.new(connection) when 'NilClass' then connection else raise "Que connection not recognized: #{connection.inspect}" end end end |
.constantize(camel_cased_word) ⇒ Object
154 155 156 157 158 159 160 161 |
# File 'lib/que.rb', line 154 def constantize(camel_cased_word) if camel_cased_word.respond_to?(:constantize) # Use ActiveSupport's version if it exists. camel_cased_word.constantize else camel_cased_word.split('::').inject(Object, &:const_get) end end |
.create! ⇒ Object
Have to support create! and drop! in old migrations. They just created and dropped the bare table.
104 105 106 |
# File 'lib/que.rb', line 104 def create! migrate! :version => 1 end |
.db_version ⇒ Object
94 95 96 |
# File 'lib/que.rb', line 94 def db_version Migrations.db_version end |
.disable_prepared_statements ⇒ Object
134 135 136 137 |
# File 'lib/que.rb', line 134 def disable_prepared_statements warn "Que.disable_prepared_statements has been deprecated, please update your code to invert the result of Que.disable_prepared_statements instead. This shim will be removed in Que version 1.0.0." !use_prepared_statements end |
.disable_prepared_statements=(setting) ⇒ Object
139 140 141 142 |
# File 'lib/que.rb', line 139 def disable_prepared_statements=(setting) warn "Que.disable_prepared_statements= has been deprecated, please update your code to pass the inverted value to Que.use_prepared_statements= instead. This shim will be removed in Que version 1.0.0." self.use_prepared_statements = !setting end |
.drop! ⇒ Object
108 109 110 |
# File 'lib/que.rb', line 108 def drop! migrate! :version => 0 end |
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
90 91 92 |
# File 'lib/que.rb', line 90 def enqueue(*args) Job.enqueue(*args) end |
.error_handler ⇒ Object
144 145 146 147 |
# File 'lib/que.rb', line 144 def error_handler warn "Que.error_handler has been renamed to Que.error_notifier, please update your code. This shim will be removed in Que version 1.0.0." error_notifier end |
.error_handler=(p) ⇒ Object
149 150 151 152 |
# File 'lib/que.rb', line 149 def error_handler=(p) warn "Que.error_handler= has been renamed to Que.error_notifier=, please update your code. This shim will be removed in Que version 1.0.0." self.error_notifier = p end |
.execute(*args) ⇒ Object
67 68 69 |
# File 'lib/que.rb', line 67 def execute(*args) adapter.execute(*args) end |
.job_stats ⇒ Object
75 76 77 |
# File 'lib/que.rb', line 75 def job_stats execute :job_stats end |
.log(data) ⇒ Object
112 113 114 115 116 117 118 119 |
# File 'lib/que.rb', line 112 def log(data) level = data.delete(:level) || :info data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data) if (l = logger) && output = log_formatter.call(data) l.send level, output end end |
.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
98 99 100 |
# File 'lib/que.rb', line 98 def migrate!(version = {:version => Migrations::CURRENT_VERSION}) Migrations.migrate!(version) end |
.transaction ⇒ Object
A helper method to manage transactions, used mainly by the migration system. It’s available for general use, but if you’re using an ORM that provides its own transaction helper, be sure to use that instead, or the two may interfere with one another.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/que.rb', line 167 def transaction adapter.checkout do if adapter.in_transaction? yield else begin execute "BEGIN" yield rescue => error raise ensure # Handle a raised error or a killed thread. if error || Thread.current.status == 'aborting' execute "ROLLBACK" else execute "COMMIT" end end end end end |
.worker_states ⇒ Object
79 80 81 82 83 84 85 86 87 |
# File 'lib/que.rb', line 79 def worker_states adapter.checkout do |conn| if conn.server_version >= 90600 execute :worker_states_96 else execute :worker_states_95 end end end |