Class: Shoryuken::Queue

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/shoryuken/queue.rb

Constant Summary collapse

FIFO_ATTR =
'FifoQueue'.freeze
MESSAGE_GROUP_ID =
'ShoryukenMessage'.freeze
VISIBILITY_TIMEOUT_ATTR =
'VisibilityTimeout'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(client, name_or_url_or_arn) ⇒ Queue

Returns a new instance of Queue.


11
12
13
14
# File 'lib/shoryuken/queue.rb', line 11

def initialize(client, name_or_url_or_arn)
  self.client = client
  set_name_and_url(name_or_url_or_arn)
end

Instance Attribute Details

#clientObject

Returns the value of attribute client


9
10
11
# File 'lib/shoryuken/queue.rb', line 9

def client
  @client
end

#nameObject

Returns the value of attribute name


9
10
11
# File 'lib/shoryuken/queue.rb', line 9

def name
  @name
end

#urlObject

Returns the value of attribute url


9
10
11
# File 'lib/shoryuken/queue.rb', line 9

def url
  @url
end

Instance Method Details

#delete_messages(options) ⇒ Object


23
24
25
26
27
28
29
30
31
# File 'lib/shoryuken/queue.rb', line 23

def delete_messages(options)
  client.delete_message_batch(
    options.merge(queue_url: url)
  ).failed.any? do |failure|
    logger.error do
      "Could not delete #{failure.id}, code: '#{failure.code}', message: '#{failure.message}', sender_fault: #{failure.sender_fault}"
    end
  end
end

#fifo?Boolean

Returns:

  • (Boolean)

49
50
51
52
53
54
55
56
# File 'lib/shoryuken/queue.rb', line 49

def fifo?
  # Make sure the memoization work with boolean to avoid multiple calls to SQS
  # see https://github.com/phstc/shoryuken/pull/529
  return @_fifo if defined?(@_fifo)

  @_fifo = queue_attributes.attributes[FIFO_ATTR] == 'true'
  @_fifo
end

#receive_messages(options) ⇒ Object


45
46
47
# File 'lib/shoryuken/queue.rb', line 45

def receive_messages(options)
  client.receive_message(options.merge(queue_url: url)).messages.map { |m| Message.new(client, self, m) }
end

#send_message(options) ⇒ Object


33
34
35
36
37
38
39
# File 'lib/shoryuken/queue.rb', line 33

def send_message(options)
  options = sanitize_message!(options).merge(queue_url: url)

  Shoryuken.client_middleware.invoke(options) do
    client.send_message(options)
  end
end

#send_messages(options) ⇒ Object


41
42
43
# File 'lib/shoryuken/queue.rb', line 41

def send_messages(options)
  client.send_message_batch(sanitize_messages!(options).merge(queue_url: url))
end

#visibility_timeoutObject


16
17
18
19
20
21
# File 'lib/shoryuken/queue.rb', line 16

def visibility_timeout
  # Always lookup for the latest visibility when cache is disabled
  # setting it to nil, forces re-lookup
  @_visibility_timeout = nil unless Shoryuken.cache_visibility_timeout?
  @_visibility_timeout ||= queue_attributes.attributes[VISIBILITY_TIMEOUT_ATTR].to_i
end