Module: Que
- Defined in:
- lib/que.rb,
lib/que/job.rb,
lib/que/sql.rb,
lib/que/worker.rb,
lib/que/railtie.rb,
lib/que/version.rb,
lib/que/migrations.rb,
lib/que/adapters/pg.rb,
lib/que/adapters/base.rb,
lib/que/adapters/pond.rb,
lib/que/adapters/sequel.rb,
lib/que/adapters/active_record.rb,
lib/que/adapters/connection_pool.rb,
lib/generators/que/install_generator.rb
Defined Under Namespace
Modules: Adapters, Migrations Classes: InstallGenerator, Job, Railtie, Worker
Constant Summary collapse
- HASH_DEFAULT_PROC =
proc { |hash, key| hash[key.to_s] if Symbol === key }
- INDIFFERENTIATOR =
proc do |object| case object when Array object.each(&INDIFFERENTIATOR) when Hash object.default_proc = HASH_DEFAULT_PROC object.each { |key, value| object[key] = INDIFFERENTIATOR.call(value) } object else object end end
- SYMBOLIZER =
proc do |object| case object when Hash object.keys.each do |key| object[key.to_sym] = SYMBOLIZER.call(object.delete(key)) end object when Array object.map! { |e| SYMBOLIZER.call(e) } else object end end
- SQL =
{ # Locks a job using a Postgres recursive CTE [1]. # # As noted by the Postgres documentation, it may be slightly easier to # think about this expression as iteration rather than recursion, despite # the `RECURSION` nomenclature defined by the SQL standards committee. # Recursion is used here so that jobs in the table can be iterated one-by- # one until a lock can be acquired, where a non-recursive `SELECT` would # have the undesirable side-effect of locking multiple jobs at once. i.e. # Consider that the following would have the worker lock *all* unlocked # jobs: # # SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked # FROM que_jobs AS j; # # The CTE will initially produce an "anchor" from the non-recursive term # (i.e. before the `UNION`), and then use it as the contents of the # working table as it continues to iterate through `que_jobs` looking for # a lock. The jobs table has a sort on (priority, run_at, job_id) which # allows it to walk the jobs table in a stable manner. As noted above, the # recursion examines one job at a time so that it only ever acquires a # single lock. # # The recursion has two possible end conditions: # # 1. If a lock *can* be acquired, it bubbles up to the top-level `SELECT` # outside of the `job` CTE which stops recursion because it is # constrained with a `LIMIT` of 1. # # 2. If a lock *cannot* be acquired, the recursive term of the expression # (i.e. what's after the `UNION`) will return an empty result set # because there are no more candidates left that could possibly be # locked. This empty result automatically ends recursion. # # Note that this query can be easily modified to lock any number of jobs # by tweaking the LIMIT clause in the main SELECT statement. # # [1] http://www.postgresql.org/docs/devel/static/queries-with.html # # Thanks to RhodiumToad in #postgresql for help with the original version # of the job lock CTE. :lock_job => %{ WITH RECURSIVE jobs AS ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() ORDER BY priority, run_at, job_id LIMIT 1 ) AS t1 UNION ALL ( SELECT (j).*, pg_try_advisory_lock((j).job_id) AS locked FROM ( SELECT ( SELECT j FROM que_jobs AS j WHERE queue = $1::text AND run_at <= now() AND (priority, run_at, job_id) > (jobs.priority, jobs.run_at, jobs.job_id) ORDER BY priority, run_at, job_id LIMIT 1 ) AS j FROM jobs WHERE jobs.job_id IS NOT NULL LIMIT 1 ) AS t1 ) ) SELECT queue, priority, run_at, job_id, job_class, args, error_count FROM jobs WHERE locked LIMIT 1 }.freeze, :check_job => %{ SELECT 1 AS one FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :set_error => %{ UPDATE que_jobs SET error_count = $1::integer, run_at = now() + $2::bigint * '1 second'::interval, last_error = $3::text WHERE queue = $4::text AND priority = $5::smallint AND run_at = $6::timestamptz AND job_id = $7::bigint }.freeze, :insert_job => %{ INSERT INTO que_jobs (queue, priority, run_at, job_class, args) VALUES (coalesce($1, '')::text, coalesce($2, 100)::smallint, coalesce($3, now())::timestamptz, $4::text, coalesce($5, '[]')::json) RETURNING * }.freeze, :destroy_job => %{ DELETE FROM que_jobs WHERE queue = $1::text AND priority = $2::smallint AND run_at = $3::timestamptz AND job_id = $4::bigint }.freeze, :job_stats => %{ SELECT queue, job_class, count(*) AS count, count(locks.job_id) AS count_working, sum((error_count > 0)::int) AS count_errored, max(error_count) AS highest_error_count, min(run_at) AS oldest_run_at FROM que_jobs LEFT JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id FROM pg_locks WHERE locktype = 'advisory' ) locks USING (job_id) GROUP BY queue, job_class ORDER BY count(*) DESC }.freeze, :worker_states => %{ SELECT que_jobs.*, pg.pid AS pg_backend_pid, pg.state AS pg_state, pg.state_change AS pg_state_changed_at, pg.query AS pg_last_query, pg.query_start AS pg_last_query_started_at, pg.xact_start AS pg_transaction_started_at, pg.waiting AS pg_waiting_on_lock FROM que_jobs JOIN ( SELECT (classid::bigint << 32) + objid::bigint AS job_id, pg_stat_activity.* FROM pg_locks JOIN pg_stat_activity USING (pid) WHERE locktype = 'advisory' ) pg USING (job_id) }.freeze }.freeze
- Version =
'0.11.0'
Class Attribute Summary collapse
- .adapter ⇒ Object
- .disable_prepared_statements ⇒ Object
-
.error_handler ⇒ Object
Returns the value of attribute error_handler.
- .json_converter ⇒ Object
- .log_formatter ⇒ Object
- .logger ⇒ Object
Class Method Summary collapse
- .clear! ⇒ Object
- .connection=(connection) ⇒ Object
- .constantize(camel_cased_word) ⇒ Object
-
.create! ⇒ Object
Have to support create! and drop! in old migrations.
- .db_version ⇒ Object
- .drop! ⇒ Object
-
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
- .execute(*args) ⇒ Object
- .job_stats ⇒ Object
- .log(data) ⇒ Object
- .migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
-
.transaction ⇒ Object
A helper method to manage transactions, used mainly by the migration system.
- .worker_states ⇒ Object
Class Attribute Details
.adapter ⇒ Object
68 69 70 |
# File 'lib/que.rb', line 68 def adapter @adapter || raise("Que connection not established!") end |
.disable_prepared_statements ⇒ Object
128 129 130 |
# File 'lib/que.rb', line 128 def disable_prepared_statements @disable_prepared_statements || false end |
.error_handler ⇒ Object
Returns the value of attribute error_handler.
49 50 51 |
# File 'lib/que.rb', line 49 def error_handler @error_handler end |
.json_converter ⇒ Object
167 168 169 |
# File 'lib/que.rb', line 167 def json_converter @json_converter ||= INDIFFERENTIATOR end |
.log_formatter ⇒ Object
124 125 126 |
# File 'lib/que.rb', line 124 def log_formatter @log_formatter ||= JSON_MODULE.method(:dump) end |
.logger ⇒ Object
120 121 122 |
# File 'lib/que.rb', line 120 def logger @logger.respond_to?(:call) ? @logger.call : @logger end |
Class Method Details
.clear! ⇒ Object
76 77 78 |
# File 'lib/que.rb', line 76 def clear! execute "DELETE FROM que_jobs" end |
.connection=(connection) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/que.rb', line 52 def connection=(connection) self.adapter = if connection.to_s == 'ActiveRecord' Adapters::ActiveRecord.new else case connection.class.to_s when 'Sequel::Postgres::Database' then Adapters::Sequel.new(connection) when 'ConnectionPool' then Adapters::ConnectionPool.new(connection) when 'PG::Connection' then Adapters::PG.new(connection) when 'Pond' then Adapters::Pond.new(connection) when 'NilClass' then connection else raise "Que connection not recognized: #{connection.inspect}" end end end |
.constantize(camel_cased_word) ⇒ Object
132 133 134 135 136 137 138 139 |
# File 'lib/que.rb', line 132 def constantize(camel_cased_word) if camel_cased_word.respond_to?(:constantize) # Use ActiveSupport's version if it exists. camel_cased_word.constantize else string.split('::').inject(Object, &:const_get) end end |
.create! ⇒ Object
Have to support create! and drop! in old migrations. They just created and dropped the bare table.
103 104 105 |
# File 'lib/que.rb', line 103 def create! migrate! :version => 1 end |
.db_version ⇒ Object
93 94 95 |
# File 'lib/que.rb', line 93 def db_version Migrations.db_version end |
.drop! ⇒ Object
107 108 109 |
# File 'lib/que.rb', line 107 def drop! migrate! :version => 0 end |
.enqueue(*args) ⇒ Object
Give us a cleaner interface when specifying a job_class as a string.
89 90 91 |
# File 'lib/que.rb', line 89 def enqueue(*args) Job.enqueue(*args) end |
.execute(*args) ⇒ Object
72 73 74 |
# File 'lib/que.rb', line 72 def execute(*args) adapter.execute(*args) end |
.job_stats ⇒ Object
80 81 82 |
# File 'lib/que.rb', line 80 def job_stats execute :job_stats end |
.log(data) ⇒ Object
111 112 113 114 115 116 117 118 |
# File 'lib/que.rb', line 111 def log(data) level = data.delete(:level) || :info data = {:lib => 'que', :hostname => Socket.gethostname, :pid => Process.pid, :thread => Thread.current.object_id}.merge(data) if (l = logger) && output = log_formatter.call(data) l.send level, output end end |
.migrate!(version = {:version => Migrations::CURRENT_VERSION}) ⇒ Object
97 98 99 |
# File 'lib/que.rb', line 97 def migrate!(version = {:version => Migrations::CURRENT_VERSION}) Migrations.migrate!(version) end |
.transaction ⇒ Object
A helper method to manage transactions, used mainly by the migration system. It’s available for general use, but if you’re using an ORM that provides its own transaction helper, be sure to use that instead, or the two may interfere with one another.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/que.rb', line 145 def transaction adapter.checkout do if adapter.in_transaction? yield else begin execute "BEGIN" yield rescue => error raise ensure # Handle a raised error or a killed thread. if error || Thread.current.status == 'aborting' execute "ROLLBACK" else execute "COMMIT" end end end end end |
.worker_states ⇒ Object
84 85 86 |
# File 'lib/que.rb', line 84 def worker_states execute :worker_states end |