Class: Shoryuken::Polling::StrictPriority

Inherits:
BaseStrategy show all
Defined in:
lib/shoryuken/polling.rb

Instance Method Summary collapse

Methods inherited from BaseStrategy

#==

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(queues) ⇒ StrictPriority

Returns a new instance of StrictPriority.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/shoryuken/polling.rb', line 131

def initialize(queues)
  # Priority ordering of the queues, highest priority first
  @queues = queues
    .group_by { |q| q }
    .sort_by { |_, qs| -qs.count }
    .map(&:first)

  # Pause status of the queues, default to past time (unpaused)
  @paused_until = queues
    .each_with_object(Hash.new) { |queue, h| h[queue] = Time.at(0) }

  # Start queues at 0
  reset_next_queue
end

Instance Method Details

#active_queuesObject



159
160
161
162
163
164
165
# File 'lib/shoryuken/polling.rb', line 159

def active_queues
  @queues
    .reverse
    .map.with_index(1)
    .reject { |q, _| queue_paused?(q) }
    .reverse
end

#messages_found(queue, messages_found) ⇒ Object



151
152
153
154
155
156
157
# File 'lib/shoryuken/polling.rb', line 151

def messages_found(queue, messages_found)
  if messages_found == 0
    pause(queue)
  else
    reset_next_queue
  end
end

#next_queueObject



146
147
148
149
# File 'lib/shoryuken/polling.rb', line 146

def next_queue
  next_queue = next_active_queue
  next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {})
end