Class: Shoryuken::Queue
- Inherits:
-
Object
show all
- Includes:
- Util
- Defined in:
- lib/shoryuken/queue.rb
Constant Summary
collapse
- FIFO_ATTR =
'FifoQueue'
- MESSAGE_GROUP_ID =
'ShoryukenMessage'
- VISIBILITY_TIMEOUT_ATTR =
'VisibilityTimeout'
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
#initialize(client, name_or_url_or_arn) ⇒ Queue
Returns a new instance of Queue.
13
14
15
16
|
# File 'lib/shoryuken/queue.rb', line 13
def initialize(client, name_or_url_or_arn)
self.client = client
set_name_and_url(name_or_url_or_arn)
end
|
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
11
12
13
|
# File 'lib/shoryuken/queue.rb', line 11
def client
@client
end
|
#name ⇒ Object
Returns the value of attribute name.
11
12
13
|
# File 'lib/shoryuken/queue.rb', line 11
def name
@name
end
|
#url ⇒ Object
Returns the value of attribute url.
11
12
13
|
# File 'lib/shoryuken/queue.rb', line 11
def url
@url
end
|
Instance Method Details
#delete_messages(options) ⇒ Object
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/shoryuken/queue.rb', line 25
def delete_messages(options)
failed_messages = client.delete_message_batch(
options.merge(queue_url: url)
).failed || []
failed_messages.any? do |failure|
logger.error do
"Could not delete #{failure.id}, code: '#{failure.code}', message: '#{failure.message}', sender_fault: #{failure.sender_fault}"
end
end
end
|
#fifo? ⇒ Boolean
53
54
55
56
57
58
59
60
|
# File 'lib/shoryuken/queue.rb', line 53
def fifo?
return @_fifo if defined?(@_fifo)
@_fifo = queue_attributes.attributes[FIFO_ATTR] == 'true'
@_fifo
end
|
#receive_messages(options) ⇒ Object
48
49
50
51
|
# File 'lib/shoryuken/queue.rb', line 48
def receive_messages(options)
messages = client.receive_message(options.merge(queue_url: url)).messages || []
messages.map { |m| Message.new(client, self, m) }
end
|
#send_message(options) ⇒ Object
36
37
38
39
40
41
42
|
# File 'lib/shoryuken/queue.rb', line 36
def send_message(options)
options = sanitize_message!(options).merge(queue_url: url)
Shoryuken.client_middleware.invoke(options) do
client.send_message(options)
end
end
|
#send_messages(options) ⇒ Object
44
45
46
|
# File 'lib/shoryuken/queue.rb', line 44
def send_messages(options)
client.send_message_batch(sanitize_messages!(options).merge(queue_url: url))
end
|
#visibility_timeout ⇒ Object
18
19
20
21
22
23
|
# File 'lib/shoryuken/queue.rb', line 18
def visibility_timeout
@_visibility_timeout = nil unless Shoryuken.cache_visibility_timeout?
@_visibility_timeout ||= queue_attributes.attributes[VISIBILITY_TIMEOUT_ATTR].to_i
end
|