Class: QueueClassicPlus::Base
Class Method Summary
collapse
inherit_for, inheritable_attr
Class Method Details
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/queue_classic_plus/base.rb', line 102
def self._perform(*args)
Metrics.timing("qu_perform_time", source: librato_key) do
if skip_transaction
perform(*args)
else
transaction do
timeout = ENV['POSTGRES_STATEMENT_TIMEOUT'].to_i * 1000
execute "SET LOCAL statement_timeout = #{timeout}"
perform(*args)
end
end
end
end
|
.can_enqueue?(method, *args) ⇒ Boolean
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/queue_classic_plus/base.rb', line 55
def self.can_enqueue?(method, *args)
if locked?
max_lock_time = ENV.fetch("QUEUE_CLASSIC_MAX_LOCK_TIME", 10 * 60).to_i
q = "SELECT COUNT(1) AS count
FROM
(
SELECT 1
FROM queue_classic_jobs
WHERE q_name = $1 AND method = $2 AND args::text = $3::text
AND (locked_at IS NULL OR locked_at > current_timestamp - interval '#{max_lock_time} seconds')
LIMIT 1
)
AS x"
result = QC.default_conn_adapter.execute(q, @queue, method, args.to_json)
result['count'].to_i == 0
else
true
end
end
|
.disable_retries! ⇒ Object
31
32
33
34
35
36
37
|
# File 'lib/queue_classic_plus/base.rb', line 31
def self.disable_retries!
unless self.retries_on.empty?
raise 'disable_retries! should not be enabled in conjunction with retry!'
end
self.disable_retries = true
end
|
.do(*args) ⇒ Object
96
97
98
99
100
|
# File 'lib/queue_classic_plus/base.rb', line 96
def self.do(*args)
Metrics.timing("qc_enqueue_time", source: librato_key) do
enqueue_perform(*args)
end
end
|
.enqueue(method, *args) ⇒ Object
77
78
79
80
81
|
# File 'lib/queue_classic_plus/base.rb', line 77
def self.enqueue(method, *args)
if can_enqueue?(method, *args)
queue.enqueue(method, *args)
end
end
|
83
84
85
|
# File 'lib/queue_classic_plus/base.rb', line 83
def self.enqueue_perform(*args)
enqueue("#{self.to_s}._perform", *args)
end
|
87
88
89
90
|
# File 'lib/queue_classic_plus/base.rb', line 87
def self.enqueue_perform_in(time, *args)
raise "Can't enqueue in the future for locked jobs" if locked?
queue.enqueue_in(time, "#{self.to_s}._perform", *args)
end
|
.librato_key ⇒ Object
117
118
119
|
# File 'lib/queue_classic_plus/base.rb', line 117
def self.librato_key
Inflector.underscore(self.name || "").gsub(/\//, ".")
end
|
.list ⇒ Object
140
141
142
143
|
# File 'lib/queue_classic_plus/base.rb', line 140
def self.list
q = "SELECT * FROM queue_classic_jobs WHERE q_name = '#{@queue}'"
execute q
end
|
.lock! ⇒ Object
39
40
41
|
# File 'lib/queue_classic_plus/base.rb', line 39
def self.lock!
self.locked = true
end
|
.locked? ⇒ Boolean
47
48
49
|
# File 'lib/queue_classic_plus/base.rb', line 47
def self.locked?
!!self.locked
end
|
.queue ⇒ Object
5
6
7
|
# File 'lib/queue_classic_plus/base.rb', line 5
def self.queue
QC::Queue.new(@queue)
end
|
.restart_in(time, remaining_retries, *args) ⇒ Object
92
93
94
|
# File 'lib/queue_classic_plus/base.rb', line 92
def self.restart_in(time, remaining_retries, *args)
queue.enqueue_retry_in(time, "#{self.to_s}._perform", remaining_retries, *args)
end
|
.retries_on?(exception) ⇒ Boolean
27
28
29
|
# File 'lib/queue_classic_plus/base.rb', line 27
def self.retries_on? exception
self.retries_on[exception.class] || self.retries_on.keys.any? {|klass| exception.is_a? klass}
end
|
.retry!(on: RuntimeError, max: 5) ⇒ Object
19
20
21
22
23
24
25
|
# File 'lib/queue_classic_plus/base.rb', line 19
def self.retry!(on: RuntimeError, max: 5)
if self.disable_retries
raise 'retry! should not be used in conjuction with disable_retries!'
end
Array(on).each {|e| self.retries_on[e] = true}
self.max_retries = max
end
|
.skip_transaction! ⇒ Object
43
44
45
|
# File 'lib/queue_classic_plus/base.rb', line 43
def self.skip_transaction!
self.skip_transaction = true
end
|
.transaction(options = {}, &block) ⇒ Object
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/queue_classic_plus/base.rb', line 121
def self.transaction(options = {}, &block)
if defined?(ActiveRecord)
ActiveRecord::Base.transaction(options, &block)
else
begin
execute "BEGIN"
block.call
rescue
execute "ROLLBACK"
raise
end
execute "COMMIT"
end
end
|