Class: Shoryuken::Polling::StrictPriority

Inherits:
BaseStrategy show all
Defined in:
lib/shoryuken/polling/strict_priority.rb

Instance Method Summary collapse

Methods inherited from BaseStrategy

#==, #delay

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(queues, delay = nil) ⇒ StrictPriority

Returns a new instance of StrictPriority.



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/shoryuken/polling/strict_priority.rb', line 4

def initialize(queues, delay = nil)
  # Priority ordering of the queues, highest priority first
  @queues = queues
            .group_by { |q| q }
            .sort_by { |_, qs| -qs.count }
            .map(&:first)

  # Pause status of the queues, default to past time (unpaused)
  @paused_until = queues
                  .each_with_object({}) { |queue, h| h[queue] = Time.at(0) }

  @delay = delay
  # Start queues at 0
  reset_next_queue
end

Instance Method Details

#active_queuesObject



33
34
35
36
37
38
39
# File 'lib/shoryuken/polling/strict_priority.rb', line 33

def active_queues
  @queues
    .reverse
    .map.with_index(1)
    .reject { |q, _| queue_paused?(q) }
    .reverse
end

#message_processed(queue) ⇒ Object



41
42
43
44
45
46
# File 'lib/shoryuken/polling/strict_priority.rb', line 41

def message_processed(queue)
  if queue_paused?(queue)
    logger.debug "Unpausing #{queue}"
    @paused_until[queue] = Time.at 0
  end
end

#messages_found(queue, messages_found) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/shoryuken/polling/strict_priority.rb', line 25

def messages_found(queue, messages_found)
  if messages_found == 0
    pause(queue)
  else
    reset_next_queue
  end
end

#next_queueObject



20
21
22
23
# File 'lib/shoryuken/polling/strict_priority.rb', line 20

def next_queue
  next_queue = next_active_queue
  next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {})
end