Class: Shoryuken::Polling::StrictPriority
Instance Method Summary
collapse
#==
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
Returns a new instance of StrictPriority.
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# File 'lib/shoryuken/polling/strict_priority.rb', line 4
def initialize(queues)
@queues = queues
.group_by { |q| q }
.sort_by { |_, qs| -qs.count }
.map(&:first)
@paused_until = queues
.each_with_object(Hash.new) { |queue, h| h[queue] = Time.at(0) }
reset_next_queue
end
|
Instance Method Details
#active_queues ⇒ Object
32
33
34
35
36
37
38
|
# File 'lib/shoryuken/polling/strict_priority.rb', line 32
def active_queues
@queues
.reverse
.map.with_index(1)
.reject { |q, _| queue_paused?(q) }
.reverse
end
|
#messages_found(queue, messages_found) ⇒ Object
24
25
26
27
28
29
30
|
# File 'lib/shoryuken/polling/strict_priority.rb', line 24
def messages_found(queue, messages_found)
if messages_found == 0
pause(queue)
else
reset_next_queue
end
end
|
#next_queue ⇒ Object
19
20
21
22
|
# File 'lib/shoryuken/polling/strict_priority.rb', line 19
def next_queue
next_queue = next_active_queue
next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {})
end
|