Module: QC

Defined in:
lib/queue_classic.rb,
lib/queue_classic/queue.rb,
lib/queue_classic/setup.rb,
lib/queue_classic/worker.rb,
lib/queue_classic/railtie.rb,
lib/queue_classic/conn_adapter.rb,
lib/generators/queue_classic/install_generator.rb

Defined Under Namespace

Modules: Setup Classes: ConnAdapter, InstallGenerator, Queue, Railtie, Worker

Constant Summary collapse

APP_NAME =

You can use the APP_NAME to query for postgres related process information in the pg_stat_activity table.

ENV["QC_APP_NAME"] || "queue_classic"
WAIT_TIME =

Number of seconds to block on the listen chanel for new jobs.

(ENV["QC_LISTEN_TIME"] || 5).to_i
TABLE_NAME =

Why do you want to change the table name? Just deal with the default OK? If you do want to change this, you will need to update the PL/pgSQL lock_head() function. Come on. Don’t do it.… Just stick with the default.

"queue_classic_jobs"
QUEUE =

Each row in the table will have a column that notes the queue. You can point your workers at different queues but only one at a time.

ENV["QUEUE"] || "default"
QUEUES =
(ENV["QUEUES"] && ENV["QUEUES"].split(",")) || []
TOP_BOUND =

Set this to 1 for strict FIFO. There is nothing special about 9.…

(ENV["QC_TOP_BOUND"] || 9).to_i
FORK_WORKER =

Set this variable if you wish for the worker to fork a UNIX process for each locked job. Remember to re-establish any database connections. See the worker for more details.

!ENV["QC_FORK_WORKER"].nil?

Class Method Summary collapse

Class Method Details

.default_conn_adapterObject



59
60
61
62
63
64
65
66
67
# File 'lib/queue_classic.rb', line 59

def self.default_conn_adapter
  return @conn_adapter if defined?(@conn_adapter) && @conn_adapter
  if rails_connection_sharing_enabled?
    @conn_adapter = ConnAdapter.new(ActiveRecord::Base.connection.raw_connection)
  else
    @conn_adapter = ConnAdapter.new
  end
  @conn_adapter
end

.default_conn_adapter=(conn) ⇒ Object



69
70
71
# File 'lib/queue_classic.rb', line 69

def self.default_conn_adapter=(conn)
  @conn_adapter = conn
end

.default_queueObject



49
50
51
52
53
# File 'lib/queue_classic.rb', line 49

def self.default_queue
  @default_queue ||= begin
    Queue.new(QUEUE)
  end
end

.default_queue=(queue) ⇒ Object



45
46
47
# File 'lib/queue_classic.rb', line 45

def self.default_queue=(queue)
  @default_queue = queue
end

.has_connection?Boolean

Returns:

  • (Boolean)


55
56
57
# File 'lib/queue_classic.rb', line 55

def self.has_connection?
  !default_conn_adapter.nil?
end

.log(data) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/queue_classic.rb', line 86

def self.log(data)
  result = nil
  data = {:lib => "queue-classic"}.merge(data)
  if block_given?
    result = yield
    data.merge(:elapsed => Integer((Time.now - t0)*1000))
  end
  data.reduce(out=String.new) do |s, tup|
    s << [tup.first, tup.last].join("=") << " "
  end
  puts(out) if ENV["DEBUG"]
  return result
end

.log_yield(data) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/queue_classic.rb', line 73

def self.log_yield(data)
  begin
    t0 = Time.now
    yield
  rescue => e
    log({:at => "error", :error => e.inspect}.merge(data))
    raise
  ensure
    t = Integer((Time.now - t0)*1000)
    log(data.merge(:elapsed => t)) unless e
  end
end

.measure(data) ⇒ Object



100
101
102
103
104
# File 'lib/queue_classic.rb', line 100

def self.measure(data)
  if ENV['QC_MEASURE']
    $stdout.puts("measure#qc.#{data}")
  end
end

.method_missing(sym, *args, &block) ⇒ Object

Defer method calls on the QC module to the default queue. This facilitates QC.enqueue()



36
37
38
# File 'lib/queue_classic.rb', line 36

def self.method_missing(sym, *args, &block)
  default_queue.send(sym, *args, &block)
end

.respond_to_missing?(method_name, include_private = false) ⇒ Boolean

Ensure QC.respond_to?(:enqueue) equals true (ruby 1.9 only)

Returns:

  • (Boolean)


41
42
43
# File 'lib/queue_classic.rb', line 41

def self.respond_to_missing?(method_name, include_private = false)
  default_queue.respond_to?(method_name)
end

.unlock_jobs_of_dead_workersObject

This will unlock all jobs any postgres’ PID that is not existing anymore to prevent any infinitely locked jobs



108
109
110
# File 'lib/queue_classic.rb', line 108

def self.unlock_jobs_of_dead_workers
  default_conn_adapter.execute("UPDATE #{QC::TABLE_NAME} SET locked_at = NULL, locked_by = NULL WHERE locked_by NOT IN (SELECT pid FROM pg_stat_activity);")
end