Class: Shoryuken::Polling::WeightedRoundRobin

Inherits:
BaseStrategy
  • Object
show all
Defined in:
lib/shoryuken/polling/weighted_round_robin.rb

Instance Method Summary collapse

Methods inherited from BaseStrategy

#==, #delay

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(queues, delay = nil) ⇒ WeightedRoundRobin

Returns a new instance of WeightedRoundRobin.



4
5
6
7
8
9
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 4

def initialize(queues, delay = nil)
  @initial_queues = queues
  @queues = queues.dup.uniq
  @paused_queues = []
  @delay = delay
end

Instance Method Details

#active_queuesObject



34
35
36
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 34

def active_queues
  unparse_queues(@queues)
end

#message_processed(queue) ⇒ Object



38
39
40
41
42
43
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 38

def message_processed(queue)
  paused_queue = @paused_queues.find { |_time, name| name == queue }
  return unless paused_queue

  paused_queue[0] = Time.at 0
end

#messages_found(queue, messages_found) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 20

def messages_found(queue, messages_found)
  if messages_found == 0
    pause(queue)
    return
  end

  maximum_weight = maximum_queue_weight(queue)
  current_weight = current_queue_weight(queue)
  if maximum_weight > current_weight
    logger.info { "Increasing #{queue} weight to #{current_weight + 1}, max: #{maximum_weight}" }
    @queues << queue
  end
end

#next_queueObject



11
12
13
14
15
16
17
18
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 11

def next_queue
  unpause_queues
  queue = @queues.shift
  return nil if queue.nil?

  @queues << queue
  QueueConfiguration.new(queue, {})
end