Module: QC
- Defined in:
- lib/queue_classic.rb,
lib/queue_classic/queue.rb,
lib/queue_classic/setup.rb,
lib/queue_classic/worker.rb,
lib/queue_classic/railtie.rb,
lib/queue_classic/conn_adapter.rb,
lib/generators/queue_classic/install_generator.rb
Defined Under Namespace
Modules: Setup Classes: ConnAdapter, InstallGenerator, Queue, Railtie, Worker
Constant Summary collapse
- APP_NAME =
You can use the APP_NAME to query for postgres related process information in the pg_stat_activity table.
ENV["QC_APP_NAME"] || "queue_classic"
- WAIT_TIME =
Number of seconds to block on the listen chanel for new jobs.
(ENV["QC_LISTEN_TIME"] || 5).to_i
- TABLE_NAME =
Why do you want to change the table name? Just deal with the default OK? If you do want to change this, you will need to update the PL/pgSQL lock_head() function. Come on. Don’t do it.… Just stick with the default.
"queue_classic_jobs"
- QUEUE =
Each row in the table will have a column that notes the queue. You can point your workers at different queues but only one at a time.
ENV["QUEUE"] || "default"
- QUEUES =
(ENV["QUEUES"] && ENV["QUEUES"].split(",")) || []
- TOP_BOUND =
Set this to 1 for strict FIFO. There is nothing special about 9.…
(ENV["QC_TOP_BOUND"] || 9).to_i
- FORK_WORKER =
Set this variable if you wish for the worker to fork a UNIX process for each locked job. Remember to re-establish any database connections. See the worker for more details.
!ENV["QC_FORK_WORKER"].nil?
Class Method Summary collapse
- .default_conn_adapter ⇒ Object
- .default_conn_adapter=(conn) ⇒ Object
- .default_queue ⇒ Object
- .default_queue=(queue) ⇒ Object
- .has_connection? ⇒ Boolean
- .log(data) ⇒ Object
- .log_yield(data) ⇒ Object
- .measure(data) ⇒ Object
-
.method_missing(sym, *args, &block) ⇒ Object
Defer method calls on the QC module to the default queue.
-
.respond_to_missing?(method_name, include_private = false) ⇒ Boolean
Ensure QC.respond_to?(:enqueue) equals true (ruby 1.9 only).
-
.unlock_jobs_of_dead_workers ⇒ Object
This will unlock all jobs any postgres’ PID that is not existing anymore to prevent any infinitely locked jobs.
Class Method Details
.default_conn_adapter ⇒ Object
59 60 61 62 63 64 65 66 67 |
# File 'lib/queue_classic.rb', line 59 def self.default_conn_adapter return @conn_adapter if defined?(@conn_adapter) && @conn_adapter if rails_connection_sharing_enabled? @conn_adapter = ConnAdapter.new(ActiveRecord::Base.connection.raw_connection) else @conn_adapter = ConnAdapter.new end @conn_adapter end |
.default_conn_adapter=(conn) ⇒ Object
69 70 71 |
# File 'lib/queue_classic.rb', line 69 def self.default_conn_adapter=(conn) @conn_adapter = conn end |
.default_queue ⇒ Object
49 50 51 52 53 |
# File 'lib/queue_classic.rb', line 49 def self.default_queue @default_queue ||= begin Queue.new(QUEUE) end end |
.default_queue=(queue) ⇒ Object
45 46 47 |
# File 'lib/queue_classic.rb', line 45 def self.default_queue=(queue) @default_queue = queue end |
.has_connection? ⇒ Boolean
55 56 57 |
# File 'lib/queue_classic.rb', line 55 def self.has_connection? !default_conn_adapter.nil? end |
.log(data) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/queue_classic.rb', line 86 def self.log(data) result = nil data = {:lib => "queue-classic"}.merge(data) if block_given? result = yield data.merge(:elapsed => Integer((Time.now - t0)*1000)) end data.reduce(out=String.new) do |s, tup| s << [tup.first, tup.last].join("=") << " " end puts(out) if ENV["DEBUG"] return result end |
.log_yield(data) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/queue_classic.rb', line 73 def self.log_yield(data) begin t0 = Time.now yield rescue => e log({:at => "error", :error => e.inspect}.merge(data)) raise ensure t = Integer((Time.now - t0)*1000) log(data.merge(:elapsed => t)) unless e end end |
.measure(data) ⇒ Object
100 101 102 103 104 |
# File 'lib/queue_classic.rb', line 100 def self.measure(data) if ENV['QC_MEASURE'] $stdout.puts("measure#qc.#{data}") end end |
.method_missing(sym, *args, &block) ⇒ Object
Defer method calls on the QC module to the default queue. This facilitates QC.enqueue()
36 37 38 |
# File 'lib/queue_classic.rb', line 36 def self.method_missing(sym, *args, &block) default_queue.send(sym, *args, &block) end |
.respond_to_missing?(method_name, include_private = false) ⇒ Boolean
Ensure QC.respond_to?(:enqueue) equals true (ruby 1.9 only)
41 42 43 |
# File 'lib/queue_classic.rb', line 41 def self.respond_to_missing?(method_name, include_private = false) default_queue.respond_to?(method_name) end |
.unlock_jobs_of_dead_workers ⇒ Object
This will unlock all jobs any postgres’ PID that is not existing anymore to prevent any infinitely locked jobs
108 109 110 |
# File 'lib/queue_classic.rb', line 108 def self.unlock_jobs_of_dead_workers default_conn_adapter.execute("UPDATE #{QC::TABLE_NAME} SET locked_at = NULL, locked_by = NULL WHERE locked_by NOT IN (SELECT pid FROM pg_stat_activity);") end |