Class: Shoryuken::Polling::StrictPriority
Instance Method Summary
collapse
#==
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
Returns a new instance of StrictPriority.
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
# File 'lib/shoryuken/polling.rb', line 131
def initialize(queues)
@queues = queues
.group_by { |q| q }
.sort_by { |_, qs| -qs.count }
.map(&:first)
@paused_until = queues
.each_with_object(Hash.new) { |queue, h| h[queue] = Time.at(0) }
reset_next_queue
end
|
Instance Method Details
#active_queues ⇒ Object
159
160
161
162
163
164
165
|
# File 'lib/shoryuken/polling.rb', line 159
def active_queues
@queues
.reverse
.map.with_index(1)
.reject { |q, _| queue_paused?(q) }
.reverse
end
|
#messages_found(queue, messages_found) ⇒ Object
151
152
153
154
155
156
157
|
# File 'lib/shoryuken/polling.rb', line 151
def messages_found(queue, messages_found)
if messages_found == 0
pause(queue)
else
reset_next_queue
end
end
|
#next_queue ⇒ Object
146
147
148
149
|
# File 'lib/shoryuken/polling.rb', line 146
def next_queue
next_queue = next_active_queue
next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {})
end
|