Class: Shoryuken::Polling::WeightedRoundRobin

Inherits:
BaseStrategy
  • Object
show all
Defined in:
lib/shoryuken/polling/weighted_round_robin.rb

Instance Method Summary collapse

Methods inherited from BaseStrategy

#==, #delay

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(queues, delay = nil) ⇒ WeightedRoundRobin

Returns a new instance of WeightedRoundRobin.



4
5
6
7
8
9
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 4

def initialize(queues, delay = nil)
  @initial_queues = queues
  @queues = queues.dup.uniq
  @paused_queues = []
  @delay = delay
end

Instance Method Details

#active_queuesObject



34
35
36
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 34

def active_queues
  unparse_queues(@queues)
end

#message_processed(queue) ⇒ Object



38
39
40
41
42
43
44
45
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 38

def message_processed(queue)
  return if @paused_queues.empty?

  logger.debug "Unpausing #{queue}"
  @paused_queues.reject! { |_time, name| name == queue }
  @queues << queue
  @queues.uniq!
end

#messages_found(queue, messages_found) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 20

def messages_found(queue, messages_found)
  if messages_found == 0
    pause(queue)
    return
  end

  maximum_weight = maximum_queue_weight(queue)
  current_weight = current_queue_weight(queue)
  if maximum_weight > current_weight
    logger.info { "Increasing #{queue} weight to #{current_weight + 1}, max: #{maximum_weight}" }
    @queues << queue
  end
end

#next_queueObject



11
12
13
14
15
16
17
18
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 11

def next_queue
  unpause_queues
  queue = @queues.shift
  return nil if queue.nil?

  @queues << queue
  QueueConfiguration.new(queue, {})
end