Class: Shoryuken::Polling::WeightedRoundRobin
Instance Method Summary
collapse
#==, #delay
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
Returns a new instance of WeightedRoundRobin.
6
7
8
9
10
11
|
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 6
def initialize(queues, delay = nil)
@initial_queues = queues
@queues = queues.dup.uniq
@paused_queues = []
@delay = delay
end
|
Instance Method Details
#active_queues ⇒ Object
36
37
38
|
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 36
def active_queues
unparse_queues(@queues)
end
|
#message_processed(queue) ⇒ Object
40
41
42
43
44
45
|
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 40
def message_processed(queue)
paused_queue = @paused_queues.find { |_time, name| name == queue }
return unless paused_queue
paused_queue[0] = Time.at 0
end
|
#messages_found(queue, messages_found) ⇒ Object
22
23
24
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 22
def messages_found(queue, messages_found)
if messages_found == 0
pause(queue)
return
end
maximum_weight = maximum_queue_weight(queue)
current_weight = current_queue_weight(queue)
if maximum_weight > current_weight
logger.info { "Increasing #{queue} weight to #{current_weight + 1}, max: #{maximum_weight}" }
@queues << queue
end
end
|
#next_queue ⇒ Object
13
14
15
16
17
18
19
20
|
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 13
def next_queue
unpause_queues
queue = @queues.shift
return nil if queue.nil?
@queues << queue
QueueConfiguration.new(queue, {})
end
|