Class: SQS::QueueService
- Inherits:
-
Object
- Object
- SQS::QueueService
- Defined in:
- lib/openwfe/extras/util/sqs.rb
Constant Summary collapse
- AWS_VERSION =
"2006-04-01"
- DEFAULT_QUEUE_HOST =
"queue.amazonaws.com"
Instance Method Summary collapse
-
#create_queue(queue_name) ⇒ Object
Creates a queue.
-
#delete_message(queue, message_id) ⇒ Object
Deletes a given message.
-
#delete_queue(queue, force = false) ⇒ Object
Deletes the queue.
-
#flush_queue(queue) ⇒ Object
Use with care !.
-
#get_message(queue, message_id) ⇒ Object
Retrieves a single message from a queue.
-
#get_messages(queue, params = {}) ⇒ Object
Retrieves a bunch of messages from a queue.
-
#get_queue(queue_name) ⇒ Object
Given a queue name, a Queue instance is returned.
-
#initialize(queue_host = nil) ⇒ QueueService
constructor
A new instance of QueueService.
-
#list_queues(prefix = nil) ⇒ Object
Lists the queues for the active AWS account.
-
#put_message(queue, content) ⇒ Object
(also: #send_message)
Given some content (‘text/plain’ content), send it as a message to a queue.
Constructor Details
#initialize(queue_host = nil) ⇒ QueueService
Returns a new instance of QueueService.
99 100 101 102 103 |
# File 'lib/openwfe/extras/util/sqs.rb', line 99 def initialize (queue_host=nil) @queue_host = queue_host @queue_host = DEFAULT_QUEUE_HOST unless @queue_host end |
Instance Method Details
#create_queue(queue_name) ⇒ Object
Creates a queue.
If the queue name doesn’t comply with SQS requirements for it, an error will be raised.
132 133 134 135 136 137 138 139 |
# File 'lib/openwfe/extras/util/sqs.rb', line 132 def create_queue (queue_name) doc = do_action :post, @queue_host, "/?QueueName=#{queue_name}" doc.elements.each("//QueueUrl") do |e| return e.text.to_s end end |
#delete_message(queue, message_id) ⇒ Object
Deletes a given message.
The queue might be a queue name (String) or a Queue instance.
233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/openwfe/extras/util/sqs.rb', line 233 def (queue, ) queue = resolve_queue(queue) path = "#{queue.path}/#{}" #path = "#{queue.path}/#{CGI::escape(message_id)}" doc = do_action :delete, queue.host, path SQS::get_element_text(doc, "//StatusCode") == "Success" end |
#delete_queue(queue, force = false) ⇒ Object
Deletes the queue. Returns true if the delete was successful. You can empty a queue by called the method #flush_queue
If ‘force’ is set to true, a flush will be performed on the queue before the actual delete operation. It should ensure a successful removal of the queue.
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
# File 'lib/openwfe/extras/util/sqs.rb', line 283 def delete_queue (queue, force=false) queue = resolve_queue(queue) flush_queue(queue) if force begin doc = do_action :delete, @queue_host, queue.path rescue Exception => e return false if e..match "^400 .*$" end SQS::get_element_text(doc, "//StatusCode") == "Success" end |
#flush_queue(queue) ⇒ Object
Use with care !
Attempts at deleting all the messages in a queue. Returns the total count of messages deleted.
A call on this method might take a certain time, as it has to delete each message individually. AWS will perhaps add a proper ‘flush_queue’ method later.
The queue might be a queue name (String) or a Queue instance.
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/openwfe/extras/util/sqs.rb', line 257 def flush_queue (queue) count = 0 while true l = (queue, :timeout => 0, :count => 255) break if l.length < 1 l.each do |m| m.delete count += 1 end end return count end |
#get_message(queue, message_id) ⇒ Object
Retrieves a single message from a queue. Returns an instance of Message.
The queue might be a queue name (String) or a Queue instance.
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/openwfe/extras/util/sqs.rb', line 212 def (queue, ) queue = resolve_queue(queue) path = "#{queue.path}/#{}" begin doc = do_action :get, queue.host, path Message.new(queue, doc.root.elements[1]) rescue Exception => e #puts e.message return nil if e..match "^404 .*$" raise e end end |
#get_messages(queue, params = {}) ⇒ Object
Retrieves a bunch of messages from a queue. Returns a list of Message instances.
There are actually two optional params that this method understands :
-
:timeout the duration in seconds of the message visibility in the
queue
-
:count the max number of message to be returned by this call
The queue might be a queue name (String) or a Queue instance.
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/openwfe/extras/util/sqs.rb', line 180 def (queue, params={}) queue = resolve_queue(queue) path = "#{queue.path}/front" path += "?" if params.size > 0 timeout = params[:timeout] count = params[:count] path += "VisibilityTimeout=#{timeout}" if timeout path += "&" if timeout and count path += "NumberOfMessages=#{count}" if count doc = do_action :get, queue.host, path = [] doc.elements.each("//Message") do |me| << Message.new(queue, me) end end |
#get_queue(queue_name) ⇒ Object
Given a queue name, a Queue instance is returned.
304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/openwfe/extras/util/sqs.rb', line 304 def get_queue (queue_name) l = list_queues(queue_name) l.each do |q| return q if q.name == queue_name end #return nil raise "found no queue named '#{queue_name}'" end |
#list_queues(prefix = nil) ⇒ Object
Lists the queues for the active AWS account. If ‘prefix’ is given, only queues whose name begin with that prefix will be returned.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/openwfe/extras/util/sqs.rb', line 110 def list_queues (prefix=nil) queues = [] path = "/" path = "#{path}?QueueNamePrefix=#{prefix}" if prefix doc = do_action :get, @queue_host, path doc.elements.each("//QueueUrl") do |e| queues << Queue.new(self, e) end return queues end |
#put_message(queue, content) ⇒ Object Also known as: send_message
Given some content (‘text/plain’ content), send it as a message to a queue. Returns the SQS message id (a String).
The queue might be a queue name (String) or a Queue instance.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/openwfe/extras/util/sqs.rb', line 148 def (queue, content) queue = resolve_queue(queue) doc = do_action :put, queue.host, "#{queue.path}/back", content #puts doc.to_s #status_code = SQS::get_element_text(doc, '//StatusCode') #message_id = SQS::get_element_text(doc, '//MessageId') #request_id = SQS::get_element_text(doc, '//RequestId') #{ :status_code => status_code, # :message_id => message_id, # :request_id => request_id } SQS::get_element_text(doc, '//MessageId') end |