Class: SQS::QueueService

Inherits:
Object
  • Object
show all
Defined in:
lib/openwfe/extras/util/sqs.rb

Constant Summary collapse

AWS_VERSION =
"2006-04-01"
DEFAULT_QUEUE_HOST =
"queue.amazonaws.com"

Instance Method Summary collapse

Constructor Details

#initialize(queue_host = nil) ⇒ QueueService

Returns a new instance of QueueService.



99
100
101
102
103
# File 'lib/openwfe/extras/util/sqs.rb', line 99

def initialize (queue_host=nil)

    @queue_host = queue_host
    @queue_host = DEFAULT_QUEUE_HOST unless @queue_host
end

Instance Method Details

#create_queue(queue_name) ⇒ Object

Creates a queue.

If the queue name doesn’t comply with SQS requirements for it, an error will be raised.



132
133
134
135
136
137
138
139
# File 'lib/openwfe/extras/util/sqs.rb', line 132

def create_queue (queue_name)

    doc = do_action :post, @queue_host, "/?QueueName=#{queue_name}"

    doc.elements.each("//QueueUrl") do |e|
        return e.text.to_s
    end
end

#delete_message(queue, message_id) ⇒ Object

Deletes a given message.

The queue might be a queue name (String) or a Queue instance.



233
234
235
236
237
238
239
240
241
242
243
# File 'lib/openwfe/extras/util/sqs.rb', line 233

def delete_message (queue, message_id)

    queue = resolve_queue(queue)

    path = "#{queue.path}/#{message_id}"
    #path = "#{queue.path}/#{CGI::escape(message_id)}"

    doc = do_action :delete, queue.host, path

    SQS::get_element_text(doc, "//StatusCode") == "Success"
end

#delete_queue(queue, force = false) ⇒ Object

Deletes the queue. Returns true if the delete was successful. You can empty a queue by called the method #flush_queue

If ‘force’ is set to true, a flush will be performed on the queue before the actual delete operation. It should ensure a successful removal of the queue.



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/openwfe/extras/util/sqs.rb', line 283

def delete_queue (queue, force=false)

    queue = resolve_queue(queue)

    flush_queue(queue) if force
    
    begin

        doc = do_action :delete, @queue_host, queue.path

    rescue Exception => e

        return false if e.message.match "^400 .*$"
    end

    SQS::get_element_text(doc, "//StatusCode") == "Success"
end

#flush_queue(queue) ⇒ Object

Use with care !

Attempts at deleting all the messages in a queue. Returns the total count of messages deleted.

A call on this method might take a certain time, as it has to delete each message individually. AWS will perhaps add a proper ‘flush_queue’ method later.

The queue might be a queue name (String) or a Queue instance.



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/openwfe/extras/util/sqs.rb', line 257

def flush_queue (queue)

    count = 0

    while true

        l = get_messages(queue, :timeout => 0, :count => 255)
        break if l.length < 1

        l.each do |m|
            m.delete
            count += 1
        end
    end

    return count
end

#get_message(queue, message_id) ⇒ Object

Retrieves a single message from a queue. Returns an instance of Message.

The queue might be a queue name (String) or a Queue instance.



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/openwfe/extras/util/sqs.rb', line 212

def get_message (queue, message_id)

    queue = resolve_queue(queue)

    path = "#{queue.path}/#{message_id}"

    begin
        doc = do_action :get, queue.host, path
        Message.new(queue, doc.root.elements[1])
    rescue Exception => e
        #puts e.message
        return nil if e.message.match "^404 .*$"
        raise e
    end
end

#get_messages(queue, params = {}) ⇒ Object

Retrieves a bunch of messages from a queue. Returns a list of Message instances.

There are actually two optional params that this method understands :

  • :timeout the duration in seconds of the message visibility in the

    queue
    
  • :count the max number of message to be returned by this call

The queue might be a queue name (String) or a Queue instance.



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/openwfe/extras/util/sqs.rb', line 180

def get_messages (queue, params={})

    queue = resolve_queue(queue)

    path = "#{queue.path}/front"

    path += "?" if params.size > 0

    timeout = params[:timeout]
    count = params[:count]

    path += "VisibilityTimeout=#{timeout}" if timeout
    path += "&" if timeout and count
    path += "NumberOfMessages=#{count}" if count

    doc = do_action :get, queue.host, path

    messages = []

    doc.elements.each("//Message") do |me|
        messages << Message.new(queue, me)
    end

    messages
end

#get_queue(queue_name) ⇒ Object

Given a queue name, a Queue instance is returned.



304
305
306
307
308
309
310
311
312
313
314
# File 'lib/openwfe/extras/util/sqs.rb', line 304

def get_queue (queue_name)

    l = list_queues(queue_name)

    l.each do |q|
        return q if q.name == queue_name
    end

    #return nil
    raise "found no queue named '#{queue_name}'"
end

#list_queues(prefix = nil) ⇒ Object

Lists the queues for the active AWS account. If ‘prefix’ is given, only queues whose name begin with that prefix will be returned.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/openwfe/extras/util/sqs.rb', line 110

def list_queues (prefix=nil)

    queues = []

    path = "/"
    path = "#{path}?QueueNamePrefix=#{prefix}" if prefix

    doc = do_action :get, @queue_host, path

    doc.elements.each("//QueueUrl") do |e|
        queues << Queue.new(self, e)
    end

    return queues
end

#put_message(queue, content) ⇒ Object Also known as: send_message

Given some content (‘text/plain’ content), send it as a message to a queue. Returns the SQS message id (a String).

The queue might be a queue name (String) or a Queue instance.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/openwfe/extras/util/sqs.rb', line 148

def put_message (queue, content)

    queue = resolve_queue(queue)

    doc = do_action :put, queue.host, "#{queue.path}/back", content

    #puts doc.to_s

    #status_code = SQS::get_element_text(doc, '//StatusCode')
    #message_id = SQS::get_element_text(doc, '//MessageId')
    #request_id = SQS::get_element_text(doc, '//RequestId')
    #{ :status_code => status_code, 
    #  :message_id => message_id, 
    #  :request_id => request_id }

    SQS::get_element_text(doc, '//MessageId')
end