Module: QC

Extended by:
Config
Defined in:
lib/queue_classic.rb,
lib/queue_classic/queue.rb,
lib/queue_classic/setup.rb,
lib/queue_classic/config.rb,
lib/queue_classic/worker.rb,
lib/queue_classic/railtie.rb,
lib/queue_classic/version.rb,
lib/queue_classic/conn_adapter.rb,
lib/generators/queue_classic/install_generator.rb

Defined Under Namespace

Modules: Config, Setup Classes: ConnAdapter, InstallGenerator, Queue, Railtie, Worker

Constant Summary collapse

DEPRECATED_CONSTANTS =

Assign constants for backwards compatibility. They should no longer be used. Prefer the corresponding methods. See QC::Config for more details.

{
  :APP_NAME => :app_name,
  :WAIT_TIME => :wait_time,
  :TABLE_NAME => :table_name,
  :QUEUE => :queue,
  :QUEUES => :queues,
  :TOP_BOUND => :top_bound,
  :FORK_WORKER => :fork_worker?,
}
VERSION =
"3.2.1"

Class Method Summary collapse

Methods included from Config

app_name, default_queue, default_queue=, default_worker_class, default_worker_class=, fork_worker?, queue, queues, reset_config, table_name, top_bound, wait_time

Class Method Details

.const_missing(const_name) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/queue_classic.rb', line 19

def self.const_missing(const_name)
  if DEPRECATED_CONSTANTS.key? const_name
    config_method = DEPRECATED_CONSTANTS[const_name]
    $stderr.puts "The constant QC::\#{const_name} is deprecated and will be removed in the future.\nPlease use the method QC.\#{config_method} instead.\n    MSG\n    QC.public_send config_method\n  else\n    super\n  end\nend\n"

.default_conn_adapterObject



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/queue_classic.rb', line 51

def self.default_conn_adapter
  t = Thread.current
  return t[:qc_conn_adapter] if t[:qc_conn_adapter]
  adapter = if rails_connection_sharing_enabled?
    ConnAdapter.new(ActiveRecord::Base.connection.raw_connection)
  else
    ConnAdapter.new
  end

  t[:qc_conn_adapter] = adapter
end

.default_conn_adapter=(conn) ⇒ Object



63
64
65
# File 'lib/queue_classic.rb', line 63

def self.default_conn_adapter=(conn)
  Thread.current[:qc_conn_adapter] = conn
end

.has_connection?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/queue_classic.rb', line 47

def self.has_connection?
  !default_conn_adapter.nil?
end

.log(data) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/queue_classic.rb', line 80

def self.log(data)
  result = nil
  data = {:lib => "queue-classic"}.merge(data)
  if block_given?
    result = yield
    data.merge(:elapsed => Integer((Time.now - t0)*1000))
  end
  data.reduce(out=String.new) do |s, tup|
    s << [tup.first, tup.last].join("=") << " "
  end
  puts(out) if ENV["DEBUG"]
  return result
end

.log_yield(data) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/queue_classic.rb', line 67

def self.log_yield(data)
  t0 = Time.now
  begin
    yield
  rescue => e
    log({:at => "error", :error => e.inspect}.merge(data))
    raise
  ensure
    t = Integer((Time.now - t0)*1000)
    log(data.merge(:elapsed => t)) unless e
  end
end

.measure(data) ⇒ Object



94
95
96
97
98
# File 'lib/queue_classic.rb', line 94

def self.measure(data)
  if ENV['QC_MEASURE']
    $stdout.puts("measure#qc.#{data}")
  end
end

.method_missing(sym, *args, &block) ⇒ Object

Defer method calls on the QC module to the default queue. This facilitates QC.enqueue()



34
35
36
37
38
39
40
# File 'lib/queue_classic.rb', line 34

def self.method_missing(sym, *args, &block)
  if default_queue.respond_to? sym
    default_queue.public_send(sym, *args, &block)
  else
    super
  end
end

.respond_to_missing?(method_name, include_private = false) ⇒ Boolean

Ensure QC.respond_to?(:enqueue) equals true (ruby 1.9 only)

Returns:

  • (Boolean)


43
44
45
# File 'lib/queue_classic.rb', line 43

def self.respond_to_missing?(method_name, include_private = false)
  default_queue.respond_to?(method_name)
end

.unlock_jobs_of_dead_workersObject

This will unlock all jobs any postgres’ PID that is not existing anymore to prevent any infinitely locked jobs



102
103
104
105
# File 'lib/queue_classic.rb', line 102

def self.unlock_jobs_of_dead_workers
  pid_column = default_conn_adapter.server_version < 90200 ? "procpid" : "pid"
  default_conn_adapter.execute("UPDATE #{QC.table_name} SET locked_at = NULL, locked_by = NULL WHERE locked_by NOT IN (SELECT #{pid_column} FROM pg_stat_activity);")
end