Class: Shoryuken::Polling::WeightedRoundRobin

Inherits:
BaseStrategy
  • Object
show all
Defined in:
lib/shoryuken/polling.rb

Instance Method Summary collapse

Methods inherited from BaseStrategy

#==

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(queues) ⇒ WeightedRoundRobin

Returns a new instance of WeightedRoundRobin.



68
69
70
71
72
# File 'lib/shoryuken/polling.rb', line 68

def initialize(queues)
  @initial_queues = queues
  @queues = queues.dup.uniq
  @paused_queues = []
end

Instance Method Details

#active_queuesObject



97
98
99
# File 'lib/shoryuken/polling.rb', line 97

def active_queues
  unparse_queues(@queues)
end

#messages_found(queue, messages_found) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/shoryuken/polling.rb', line 83

def messages_found(queue, messages_found)
  if messages_found == 0
    pause(queue)
    return
  end

  maximum_weight = maximum_queue_weight(queue)
  current_weight = current_queue_weight(queue)
  if maximum_weight > current_weight
    logger.info { "Increasing '#{queue}' weight to #{current_weight + 1}, max: #{maximum_weight}" }
    @queues << queue
  end
end

#next_queueObject



74
75
76
77
78
79
80
81
# File 'lib/shoryuken/polling.rb', line 74

def next_queue
  unpause_queues
  queue = @queues.shift
  return nil if queue.nil?

  @queues << queue
  QueueConfiguration.new(queue, {})
end