Class: LSQS::Queue
- Inherits:
-
Object
- Object
- LSQS::Queue
- Defined in:
- lib/lsqs/queue.rb
Constant Summary collapse
- DEFAULT_TIMEOUT =
30
Instance Attribute Summary collapse
-
#attributes ⇒ Object
Returns the value of attribute attributes.
-
#in_flight ⇒ Object
Returns the value of attribute in_flight.
-
#messages ⇒ Object
Returns the value of attribute messages.
-
#monitor ⇒ Object
Returns the value of attribute monitor.
-
#name ⇒ Object
Returns the value of attribute name.
Instance Method Summary collapse
-
#change_message_visibility(receipt, seconds) ⇒ Object
Change the visibility of a message that is in flight.
-
#create_message(options = {}) ⇒ Message
Creates a new message in the queue.
-
#delete_message(receipt) ⇒ Object
Deletes a message from the messages that are in-flight.
-
#generate_receipt ⇒ String
Generates a hex receipt for the message.
-
#get_messages(options = {}) ⇒ Hash
Gets a number of messages based on the MaxNumberOfMessages field.
-
#initialize(name, params = {}) ⇒ Queue
constructor
A new instance of Queue.
-
#purge ⇒ Object
Deletes all messages in queue and in flight.
-
#size ⇒ Fixnum
Returns the amount of messages in the queue.
-
#timeout_messages ⇒ Object
Checks if in-fligh messages need to be put back in the queue.
-
#visibility_timeout ⇒ Fixnum
Sets the default timeout of a queue.
Constructor Details
#initialize(name, params = {}) ⇒ Queue
Returns a new instance of Queue.
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/lsqs/queue.rb', line 7 def initialize(name, params = {}) @name = name @attributes = params.fetch('Attributes'){Hash.new} @monitor = Monitor.new @messages = [] @in_flight = {} @timeout = true check_timeout end |
Instance Attribute Details
#attributes ⇒ Object
Returns the value of attribute attributes.
3 4 5 |
# File 'lib/lsqs/queue.rb', line 3 def attributes @attributes end |
#in_flight ⇒ Object
Returns the value of attribute in_flight.
3 4 5 |
# File 'lib/lsqs/queue.rb', line 3 def in_flight @in_flight end |
#messages ⇒ Object
Returns the value of attribute messages.
3 4 5 |
# File 'lib/lsqs/queue.rb', line 3 def @messages end |
#monitor ⇒ Object
Returns the value of attribute monitor.
3 4 5 |
# File 'lib/lsqs/queue.rb', line 3 def monitor @monitor end |
#name ⇒ Object
Returns the value of attribute name.
3 4 5 |
# File 'lib/lsqs/queue.rb', line 3 def name @name end |
Instance Method Details
#change_message_visibility(receipt, seconds) ⇒ Object
Change the visibility of a message that is in flight. If visibility is set to 0, put back in the queue.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/lsqs/queue.rb', line 119 def (receipt, seconds) lock do = @in_flight[receipt] raise 'MessageNotInflight' unless if seconds == 0 .expire @messages << (receipt) else .expire_in(seconds) end end end |
#create_message(options = {}) ⇒ Message
Creates a new message in the queue.
36 37 38 39 40 41 42 |
# File 'lib/lsqs/queue.rb', line 36 def ( = {}) lock do = Message.new() @messages << return end end |
#delete_message(receipt) ⇒ Object
Deletes a message from the messages that are in-flight.
78 79 80 81 82 |
# File 'lib/lsqs/queue.rb', line 78 def (receipt) lock do in_flight.delete(receipt) end end |
#generate_receipt ⇒ String
Generates a hex receipt for the message
108 109 110 |
# File 'lib/lsqs/queue.rb', line 108 def generate_receipt SecureRandom.hex(16) end |
#get_messages(options = {}) ⇒ Hash
Gets a number of messages based on the MaxNumberOfMessages field.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/lsqs/queue.rb', line 51 def ( = {}) = .fetch('MaxNumberOfMessages'){1}.to_i raise 'ReadCountOutOfRange' if > 10 result = {} lock do amount = > size ? size : amount.times do = .delete_at(rand(size)) .expire_in(visibility_timeout) receipt = generate_receipt @in_flight[receipt] = result[receipt] = end end return result end |
#purge ⇒ Object
Deletes all messages in queue and in flight.
87 88 89 90 91 92 |
# File 'lib/lsqs/queue.rb', line 87 def purge lock do @messages = [] @in_flight = {} end end |
#size ⇒ Fixnum
Returns the amount of messages in the queue.
99 100 101 |
# File 'lib/lsqs/queue.rb', line 99 def size .size end |
#timeout_messages ⇒ Object
Checks if in-fligh messages need to be put back in the queue.
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/lsqs/queue.rb', line 137 def lock do in_flight.each do |key, | if .expired? .expire @messages << (key) end end end end |
#visibility_timeout ⇒ Fixnum
Sets the default timeout of a queue. It takes the value from the attributes, if it is set, otherwise it uses the ‘DEFAULT_TIMEOUT` constant.
25 26 27 |
# File 'lib/lsqs/queue.rb', line 25 def visibility_timeout attributes['VisibilityTimeout'] || DEFAULT_TIMEOUT end |