Class: Shoryuken::Polling::WeightedRoundRobin
Instance Method Summary
collapse
#==, #delay
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
Returns a new instance of WeightedRoundRobin.
4
5
6
7
8
9
|
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 4
def initialize(queues, delay = nil)
@initial_queues = queues
@queues = queues.dup.uniq
@paused_queues = []
@delay = delay
end
|
Instance Method Details
#active_queues ⇒ Object
34
35
36
|
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 34
def active_queues
unparse_queues(@queues)
end
|
#message_processed(queue) ⇒ Object
38
39
40
41
42
43
|
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 38
def message_processed(queue)
paused_queue = @paused_queues.find { |_time, name| name == queue }
return unless paused_queue
paused_queue[0] = Time.at 0
end
|
#messages_found(queue, messages_found) ⇒ Object
20
21
22
23
24
25
26
27
28
29
30
31
32
|
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 20
def messages_found(queue, messages_found)
if messages_found == 0
pause(queue)
return
end
maximum_weight = maximum_queue_weight(queue)
current_weight = current_queue_weight(queue)
if maximum_weight > current_weight
logger.info { "Increasing #{queue} weight to #{current_weight + 1}, max: #{maximum_weight}" }
@queues << queue
end
end
|
#next_queue ⇒ Object
11
12
13
14
15
16
17
18
|
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 11
def next_queue
unpause_queues
queue = @queues.shift
return nil if queue.nil?
@queues << queue
QueueConfiguration.new(queue, {})
end
|