Class: RightAws::SqsInterface
- Inherits:
-
Object
- Object
- RightAws::SqsInterface
- Defined in:
- lib/sqs/right_sqs_interface.rb
Defined Under Namespace
Classes: SqsCreateQueueParser, SqsGetQueueAttributesParser, SqsGetVisibilityTimeoutParser, SqsListGrantsParser, SqsListQueuesParser, SqsReceiveMessagesParser, SqsSendMessagesParser, SqsStatusParser
Constant Summary collapse
- SIGNATURE_VERSION =
"1"
- API_VERSION =
"2007-05-01"
- DEFAULT_HOST =
"queue.amazonaws.com"
- DEFAULT_PORT =
443
- DEFAULT_PROTOCOL =
'https'
- REQUEST_TTL =
30
- DEFAULT_VISIBILITY_TIMEOUT =
30
- @@amazon_problems =
A list of Amazon problems we can handle via AWSErrorHandler.
RightAws::AMAZON_PROBLEMS
- @@bench_sqs =
Benchmark::Tms.new()
- @@bench_xml =
Benchmark::Tms.new()
Instance Attribute Summary collapse
-
#aws_access_key_id ⇒ Object
readonly
Current aws_access_key_id.
-
#last_errors ⇒ Object
Last AWS errors list (used by AWSErrorHandler).
-
#last_request ⇒ Object
readonly
Last HTTP request object.
-
#last_request_id ⇒ Object
Last AWS request id (used by AWSErrorHandler).
-
#last_response ⇒ Object
readonly
Last HTTP response object.
-
#logger ⇒ Object
Logger object.
-
#params ⇒ Object
Initial params hash.
Class Method Summary collapse
-
.amazon_problems ⇒ Object
Returns a list of Amazon service responses which are known to be transient problems.
-
.amazon_problems=(problems_list) ⇒ Object
Sets the list of Amazon side problems.
-
.bench_sqs ⇒ Object
Benchmark::Tms instance for SQS access benchmarking.
-
.bench_xml ⇒ Object
Benchmark::Tms instance for XML parsing benchmarking.
-
.queue_name_by_url(queue_url) ⇒ Object
Returns short queue name by url.
Instance Method Summary collapse
-
#add_grant(queue_url, grantee_email_address, permission = nil) ⇒ Object
Adds grants for user (identified by email he registered at Amazon).
-
#change_message_visibility(queue_url, message_id, visibility_timeout = 0) ⇒ Object
Changes message visibility timeout.
-
#clear_queue(queue_url) ⇒ Object
Removes all visible messages from queue.
-
#create_queue(queue_name, default_visibility_timeout = nil) ⇒ Object
Creates new queue.
-
#delete_message(queue_url, message_id) ⇒ Object
Deletes message from queue.
-
#delete_queue(queue_url, force_deletion = false) ⇒ Object
Deletes queue (queue must be empty or
force_deletion
must be set to true). -
#force_clear_queue(queue_url) ⇒ Object
Deletes queue then re-creates it (restores attributes also).
-
#force_delete_queue(queue_url) ⇒ Object
Deletes queue even if it has messages.
-
#generate_request(action, param = {}) ⇒ Object
Generates a request hash for the query API.
-
#generate_rest_request(method, param) ⇒ Object
Generates a request hash for the REST API.
-
#get_queue_attributes(queue_url, attribute = 'All') ⇒ Object
Retrieves the queue attribute(s).
-
#get_queue_length(queue_url) ⇒ Object
Returns approximate number of messages in queue.
-
#get_visibility_timeout(queue_url) ⇒ Object
Retrieves visibility timeout.
-
#initialize(aws_access_key_id, aws_secret_access_key, params = {}) ⇒ SqsInterface
constructor
Creates a new SqsInterface instance.
-
#list_grants(queue_url, grantee_email_address = nil, permission = nil) ⇒ Object
Retrieves hash of
grantee_id
=>perms
for this queue:. -
#list_queues(queue_name_prefix = nil) ⇒ Object
Lists all queues owned by this user that have names beginning with
queue_name_prefix
. -
#multi_thread ⇒ Object
Return
true
if this RightS3 instance is running in multi_thread state andfalse
otherwise. -
#on_exception(options = {:raise=>true, :log=>true}) ⇒ Object
:nodoc:.
-
#peek_message(queue_url, message_id) ⇒ Object
Peeks message from queue by message id.
-
#pop_message(queue_url) ⇒ Object
Pops (retrieves and deletes) first accessible message from queue.
-
#pop_messages(queue_url, number_of_messages = 1) ⇒ Object
Pops (retrieves and deletes) up to ‘number_of_messages’ from queue.
-
#queue_name_by_url(queue_url) ⇒ Object
Returns short queue name by url.
-
#queue_url_by_name(queue_name) ⇒ Object
Returns queue url by queue short name or
nil
if queue is not found. -
#receive_message(queue_url, visibility_timeout = nil) ⇒ Object
Reads first accessible message from queue.
-
#receive_messages(queue_url, number_of_messages = 1, visibility_timeout = nil) ⇒ Object
Retrieves a list of messages from queue.
-
#remove_grant(queue_url, grantee_email_address_or_id, permission = nil) ⇒ Object
Revokes permission from user.
-
#request_info(request, parser) ⇒ Object
Sends request to Amazon and parses the response Raises AwsError if any banana happened.
-
#send_message(queue_url, message) ⇒ Object
(also: #push_message)
Sends new message to queue.Returns ‘message_id’ or raises an exception.
-
#set_queue_attributes(queue_url, attribute, value) ⇒ Object
Sets queue attribute.
-
#set_visibility_timeout(queue_url, visibility_timeout = nil) ⇒ Object
Sets visibility timeout.
Constructor Details
#initialize(aws_access_key_id, aws_secret_access_key, params = {}) ⇒ SqsInterface
Creates a new SqsInterface instance.
sqs = RightAws::SqsInterface.new('1E3GDYEOGFJPIT75KDT40','hgTHt68JY07JKUY08ftHYtERkjgtfERn57DFE379', {:multi_thread => true, :logger => Logger.new('/tmp/x.log')}) #=> <RightSqs:0xb7af6264>
Params is a hash:
{:server => 'queue.amazonaws.com' # Amazon service host: 'queue.amazonaws.com'(default)
:port => 443 # Amazon service port: 80 or 443(default)
:multi_thread => true|false # Multi-threaded (connection per each thread): true or false(default)
:logger => Logger Object} # Logger instance: logs to STDOUT if omitted }
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/sqs/right_sqs_interface.rb', line 85 def initialize(aws_access_key_id, aws_secret_access_key, params={}) @params = params raise AwsError.new("AWS access keys are required to operate on SQS") \ if aws_access_key_id.blank? || aws_secret_access_key.blank? @aws_access_key_id = aws_access_key_id @aws_secret_access_key = aws_secret_access_key # params @params[:server] ||= DEFAULT_HOST @params[:port] ||= DEFAULT_PORT @params[:protocol] ||= DEFAULT_PROTOCOL @params[:multi_thread] ||= defined?(AWS_DAEMON) # set logger @logger = @params[:logger] @logger = RAILS_DEFAULT_LOGGER if !@logger && defined?(RAILS_DEFAULT_LOGGER) @logger = Logger.new(STDOUT) if !@logger @logger.info "New #{self.class.name} using #{@params[:multi_thread] ? 'multi' : 'single'}-threaded mode" end |
Instance Attribute Details
#aws_access_key_id ⇒ Object (readonly)
Current aws_access_key_id
39 40 41 |
# File 'lib/sqs/right_sqs_interface.rb', line 39 def aws_access_key_id @aws_access_key_id end |
#last_errors ⇒ Object
Last AWS errors list (used by AWSErrorHandler)
45 46 47 |
# File 'lib/sqs/right_sqs_interface.rb', line 45 def last_errors @last_errors end |
#last_request ⇒ Object (readonly)
Last HTTP request object
41 42 43 |
# File 'lib/sqs/right_sqs_interface.rb', line 41 def last_request @last_request end |
#last_request_id ⇒ Object
Last AWS request id (used by AWSErrorHandler)
47 48 49 |
# File 'lib/sqs/right_sqs_interface.rb', line 47 def last_request_id @last_request_id end |
#last_response ⇒ Object (readonly)
Last HTTP response object
43 44 45 |
# File 'lib/sqs/right_sqs_interface.rb', line 43 def last_response @last_response end |
#logger ⇒ Object
Logger object
49 50 51 |
# File 'lib/sqs/right_sqs_interface.rb', line 49 def logger @logger end |
#params ⇒ Object
Initial params hash
51 52 53 |
# File 'lib/sqs/right_sqs_interface.rb', line 51 def params @params end |
Class Method Details
.amazon_problems ⇒ Object
Returns a list of Amazon service responses which are known to be transient problems. We have to re-request if we get any of them, because the problem will probably disappear. By default this method returns the same value as the AMAZON_PROBLEMS const.
64 65 66 |
# File 'lib/sqs/right_sqs_interface.rb', line 64 def self.amazon_problems @@amazon_problems end |
.amazon_problems=(problems_list) ⇒ Object
Sets the list of Amazon side problems. Use in conjunction with the getter to append problems.
70 71 72 |
# File 'lib/sqs/right_sqs_interface.rb', line 70 def self.amazon_problems=(problems_list) @@amazon_problems = problems_list end |
.bench_sqs ⇒ Object
Benchmark::Tms instance for SQS access benchmarking.
57 |
# File 'lib/sqs/right_sqs_interface.rb', line 57 def self.bench_sqs; @@bench_sqs; end |
.bench_xml ⇒ Object
Benchmark::Tms instance for XML parsing benchmarking.
59 |
# File 'lib/sqs/right_sqs_interface.rb', line 59 def self.bench_xml; @@bench_xml; end |
.queue_name_by_url(queue_url) ⇒ Object
Returns short queue name by url.
RightSqs.queue_name_by_url('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 'my_awesome_queue'
446 447 448 449 450 |
# File 'lib/sqs/right_sqs_interface.rb', line 446 def self.queue_name_by_url(queue_url) queue_url[/[^\/]*$/] rescue on_exception end |
Instance Method Details
#add_grant(queue_url, grantee_email_address, permission = nil) ⇒ Object
Adds grants for user (identified by email he registered at Amazon). Returns true
or an exception. Permission = ‘FULLCONTROL’ | ‘RECEIVEMESSAGE’ | ‘SENDMESSAGE’.
sqs.add_grant('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '[email protected]', 'FULLCONTROL') #=> true
304 305 306 307 308 309 310 311 312 |
# File 'lib/sqs/right_sqs_interface.rb', line 304 def add_grant(queue_url, grantee_email_address, = nil) req_hash = generate_request('AddGrant', 'Grantee.EmailAddress' => grantee_email_address, 'Permission' => , :queue_url => queue_url) request_info(req_hash, SqsStatusParser.new(:logger => @logger)) rescue on_exception end |
#change_message_visibility(queue_url, message_id, visibility_timeout = 0) ⇒ Object
Changes message visibility timeout. Returns true
or an exception.
sqs.('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '1234567890...0987654321', 10) #=> true
417 418 419 420 421 422 423 424 425 |
# File 'lib/sqs/right_sqs_interface.rb', line 417 def (queue_url, , visibility_timeout=0) req_hash = generate_request('ChangeMessageVisibility', 'MessageId' => , 'VisibilityTimeout' => visibility_timeout.to_s, :queue_url => queue_url) request_info(req_hash, SqsStatusParser.new(:logger => @logger)) rescue on_exception end |
#clear_queue(queue_url) ⇒ Object
Removes all visible messages from queue. Return true
or an exception.
sqs.clear_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true
476 477 478 479 480 481 |
# File 'lib/sqs/right_sqs_interface.rb', line 476 def clear_queue(queue_url) while (m = (queue_url)) ; end # delete all messages in queue true rescue on_exception end |
#create_queue(queue_name, default_visibility_timeout = nil) ⇒ Object
Creates new queue. Returns new queue link.
sqs.create_queue('my_awesome_queue') #=> 'http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue'
PS Some queue based requests may not become available until a couple of minutes after queue creation (permission grant and removal for example)
209 210 211 212 213 214 |
# File 'lib/sqs/right_sqs_interface.rb', line 209 def create_queue(queue_name, default_visibility_timeout=nil) req_hash = generate_request('CreateQueue', 'QueueName' => queue_name, 'DefaultVisibilityTimeout' => default_visibility_timeout || DEFAULT_VISIBILITY_TIMEOUT ) request_info(req_hash, SqsCreateQueueParser.new(:logger => @logger)) end |
#delete_message(queue_url, message_id) ⇒ Object
Deletes message from queue. Returns true
or an exception. Amazon returns true
on deletion of non-existent messages.
sqs.('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '12345678904...0987654321') #=> true
404 405 406 407 408 409 410 411 |
# File 'lib/sqs/right_sqs_interface.rb', line 404 def (queue_url, ) req_hash = generate_request('DeleteMessage', 'MessageId' => , :queue_url => queue_url) request_info(req_hash, SqsStatusParser.new(:logger => @logger)) rescue on_exception end |
#delete_queue(queue_url, force_deletion = false) ⇒ Object
Deletes queue (queue must be empty or force_deletion
must be set to true). Queue is identified by url. Returns true
or an exception.
sqs.delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue_2') #=> true
233 234 235 236 237 238 239 240 |
# File 'lib/sqs/right_sqs_interface.rb', line 233 def delete_queue(queue_url, force_deletion = false) req_hash = generate_request('DeleteQueue', 'ForceDeletion' => force_deletion.to_s, :queue_url => queue_url) request_info(req_hash, SqsStatusParser.new(:logger => @logger)) rescue on_exception end |
#force_clear_queue(queue_url) ⇒ Object
Deletes queue then re-creates it (restores attributes also). The fastest method to clear big queue or queue with invisible messages. Return true
or an exception.
sqs.force_clear_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true
PS This function is no longer supported. Amazon has changed the SQS semantics to require at least 60 seconds between queue deletion and creation. Hence this method will fail with an exception.
490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'lib/sqs/right_sqs_interface.rb', line 490 def force_clear_queue(queue_url) queue_name = queue_name_by_url(queue_url) queue_attributes = get_queue_attributes(queue_url) force_delete_queue(queue_url) create_queue(queue_name) # hmmm... The next line is a trick. Amazon do not want change attributes immediately after queue creation # So we do 'empty' get_queue_attributes. Probably they need some time to allow attributes change. get_queue_attributes(queue_url) queue_attributes.each{ |attribute, value| set_queue_attributes(queue_url, attribute, value) } true rescue on_exception end |
#force_delete_queue(queue_url) ⇒ Object
Deletes queue even if it has messages. Return true
or an exception.
force_delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true
P.S. same as delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', true)
509 510 511 512 513 |
# File 'lib/sqs/right_sqs_interface.rb', line 509 def force_delete_queue(queue_url) delete_queue(queue_url, true) rescue on_exception end |
#generate_request(action, param = {}) ⇒ Object
Generates a request hash for the query API
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/sqs/right_sqs_interface.rb', line 117 def generate_request(action, param={}) # :nodoc: # Sometimes we need to use queue uri (delete queue etc) # In that case we will use Symbol key: 'param[:queue_url]' queue_uri = param[:queue_url] ? URI(param[:queue_url]).path : '/' # remove unset(=optional) and symbolyc keys param.each{ |key, value| param.delete(key) if (value.nil? || key.is_a?(Symbol)) } # prepare output hash request_hash = { "Action" => action, # "Expires" => Time.now.utc.since(REQUEST_TTL).strftime("%Y-%m-%dT%H:%M:%SZ"), "Expires" => (Time.now + REQUEST_TTL).utc.strftime("%Y-%m-%dT%H:%M:%SZ"), "AWSAccessKeyId" => @aws_access_key_id, "Version" => API_VERSION, "SignatureVersion" => SIGNATURE_VERSION } request_hash.update(param) request_data = request_hash.sort{|a,b| (a[0].to_s.downcase)<=>(b[0].to_s.downcase)}.to_s request_hash['Signature'] = Base64.encode64( OpenSSL::HMAC.digest( OpenSSL::Digest::Digest.new( "sha1" ), @aws_secret_access_key, request_data)).strip request_params = request_hash.to_a.collect{|key,val| key.to_s + "=" + CGI::escape(val.to_s) }.join("&") request = Net::HTTP::Get.new("#{queue_uri}?#{request_params}") # prepare output hash { :request => request, :server => @params[:server], :port => @params[:port], :protocol => @params[:protocol] } end |
#generate_rest_request(method, param) ⇒ Object
Generates a request hash for the REST API
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/sqs/right_sqs_interface.rb', line 143 def generate_rest_request(method, param) # :nodoc: queue_uri = param[:queue_url] ? URI(param[:queue_url]).path : '/' = param[:message] # extract message body if nesessary # remove unset(=optional) and symbolyc keys param.each{ |key, value| param.delete(key) if (value.nil? || key.is_a?(Symbol)) } # created request param_to_str = param.to_a.collect{|key,val| key.to_s + "=" + CGI::escape(val.to_s) }.join("&") param_to_str = "?#{param_to_str}" unless param_to_str.blank? request = "Net::HTTP::#{method.capitalize}".constantize.new("#{queue_uri}#{param_to_str}") request.body = if # set main headers request['content-md5'] = '' request['Content-Type'] = 'text/plain' request['Date'] = Time.now.httpdate # generate authorization string auth_string = "#{method.upcase}\n#{request['content-md5']}\n#{request['Content-Type']}\n#{request['Date']}\n#{CGI::unescape(queue_uri)}" signature = Base64.encode64(OpenSSL::HMAC.digest(OpenSSL::Digest::Digest.new("sha1"), @aws_secret_access_key, auth_string)).strip # set other headers request['Authorization'] = "AWS #{@aws_access_key_id}:#{signature}" request['AWS-Version'] = API_VERSION # prepare output hash { :request => request, :server => @params[:server], :port => @params[:port], :protocol => @params[:protocol] } end |
#get_queue_attributes(queue_url, attribute = 'All') ⇒ Object
Retrieves the queue attribute(s). Returns a hash of attribute(s) or an exception.
sqs.get_queue_attributes('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> {"ApproximateNumberOfMessages"=>"0", "VisibilityTimeout"=>"30"}
246 247 248 249 250 251 252 253 |
# File 'lib/sqs/right_sqs_interface.rb', line 246 def get_queue_attributes(queue_url, attribute='All') req_hash = generate_request('GetQueueAttributes', 'Attribute' => attribute, :queue_url => queue_url) request_info(req_hash, SqsGetQueueAttributesParser.new(:logger => @logger)) rescue on_exception end |
#get_queue_length(queue_url) ⇒ Object
Returns approximate number of messages in queue.
sqs.get_queue_length('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 3
466 467 468 469 470 |
# File 'lib/sqs/right_sqs_interface.rb', line 466 def get_queue_length(queue_url) get_queue_attributes(queue_url)['ApproximateNumberOfMessages'].to_i rescue on_exception end |
#get_visibility_timeout(queue_url) ⇒ Object
Retrieves visibility timeout.
sqs.get_visibility_timeout('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 15
See also: get_queue_attributes
293 294 295 296 297 298 |
# File 'lib/sqs/right_sqs_interface.rb', line 293 def get_visibility_timeout(queue_url) req_hash = generate_request('GetVisibilityTimeout', :queue_url => queue_url ) request_info(req_hash, SqsGetVisibilityTimeoutParser.new(:logger => @logger)) rescue on_exception end |
#list_grants(queue_url, grantee_email_address = nil, permission = nil) ⇒ Object
Retrieves hash of grantee_id
=> perms
for this queue:
sqs.list_grants('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=>
{"000000000000000000000001111111111117476c7fea6efb2c3347ac3ab2792a"=>{:name=>"root", :perms=>["FULLCONTROL"]},
"00000000000000000000000111111111111e5828344600fc9e4a784a09e97041"=>{:name=>"myawesomefriend", :perms=>["FULLCONTROL"]}
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/sqs/right_sqs_interface.rb', line 320 def list_grants(queue_url, grantee_email_address=nil, = nil) req_hash = generate_request('ListGrants', 'Grantee.EmailAddress' => grantee_email_address, 'Permission' => , :queue_url => queue_url) response = request_info(req_hash, SqsListGrantsParser.new(:logger => @logger)) # One user may have up to 3 permission records for every queue. # We will join these records to one. result = {} response.each do |perm| id = perm[:id] # create hash for new user if unexisit result[id] = {:perms=>[]} unless result[id] # fill current grantee params result[id][:perms] << perm[:permission] result[id][:name] = perm[:name] end result end |
#list_queues(queue_name_prefix = nil) ⇒ Object
Lists all queues owned by this user that have names beginning with queue_name_prefix
. If queue_name_prefix
is omitted then retrieves a list of all queues.
sqs.create_queue('my_awesome_queue')
sqs.create_queue('my_awesome_queue_2')
sqs.list_queues('my_awesome') #=> ['http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue','http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue_2']
222 223 224 225 226 227 |
# File 'lib/sqs/right_sqs_interface.rb', line 222 def list_queues(queue_name_prefix=nil) req_hash = generate_request('ListQueues', 'QueueNamePrefix' => queue_name_prefix) request_info(req_hash, SqsListQueuesParser.new(:logger => @logger)) rescue on_exception end |
#multi_thread ⇒ Object
Return true
if this RightS3 instance is running in multi_thread state and false
otherwise.
108 109 110 |
# File 'lib/sqs/right_sqs_interface.rb', line 108 def multi_thread @params[:multi_thread] end |
#on_exception(options = {:raise=>true, :log=>true}) ⇒ Object
:nodoc:
103 104 105 |
# File 'lib/sqs/right_sqs_interface.rb', line 103 def on_exception(={:raise=>true, :log=>true}) # :nodoc: AwsError::on_aws_exception(self, ) end |
#peek_message(queue_url, message_id) ⇒ Object
Peeks message from queue by message id. Returns message in format of {:id=>'message_id', :body=>'message_body'}
or nil
.
sqs.('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '1234567890...0987654321') #=>
{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}
378 379 380 381 382 383 384 |
# File 'lib/sqs/right_sqs_interface.rb', line 378 def (queue_url, ) req_hash = generate_rest_request('GET', :queue_url => "#{queue_url}/#{CGI::escape }" ) = request_info(req_hash, SqsReceiveMessagesParser.new(:logger => @logger)) .blank? ? nil : [0] rescue on_exception end |
#pop_message(queue_url) ⇒ Object
Pops (retrieves and deletes) first accessible message from queue. Returns the message in format {:id=>'message_id', :body=>'message_body'}
or nil
.
sqs.('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=>
{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}
550 551 552 553 554 555 |
# File 'lib/sqs/right_sqs_interface.rb', line 550 def (queue_url) = (queue_url) .blank? ? nil : [0] rescue on_exception end |
#pop_messages(queue_url, number_of_messages = 1) ⇒ Object
Pops (retrieves and deletes) up to ‘number_of_messages’ from queue. Returns an array of retrieved messages in format: [{:id=>'message_id', :body=>'message_body'}]
.
sqs.pop_messages('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 3) #=>
[{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}, ..., {}]
535 536 537 538 539 540 541 542 543 |
# File 'lib/sqs/right_sqs_interface.rb', line 535 def (queue_url, =1) = (queue_url, ) .each do || (queue_url, [:id]) end rescue on_exception end |
#queue_name_by_url(queue_url) ⇒ Object
Returns short queue name by url.
sqs.queue_name_by_url('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 'my_awesome_queue'
456 457 458 459 460 |
# File 'lib/sqs/right_sqs_interface.rb', line 456 def queue_name_by_url(queue_url) self.class.queue_name_by_url(queue_url) rescue on_exception end |
#queue_url_by_name(queue_name) ⇒ Object
Returns queue url by queue short name or nil
if queue is not found
sqs.queue_url_by_name('my_awesome_queue') #=> 'http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue'
431 432 433 434 435 436 437 438 439 440 |
# File 'lib/sqs/right_sqs_interface.rb', line 431 def queue_url_by_name(queue_name) return queue_name if queue_name.include?('/') queue_urls = list_queues(queue_name) queue_urls.each do |queue_url| return queue_url if queue_name_by_url(queue_url) == queue_name end nil rescue on_exception end |
#receive_message(queue_url, visibility_timeout = nil) ⇒ Object
Reads first accessible message from queue. Returns message as a hash: {:id=>'message_id', :body=>'message_body'}
or nil
.
sqs.('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 10) #=>
{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}
520 521 522 523 524 525 |
# File 'lib/sqs/right_sqs_interface.rb', line 520 def (queue_url, visibility_timeout=nil) result = (queue_url, 1, visibility_timeout) result.blank? ? nil : result[0] rescue on_exception end |
#receive_messages(queue_url, number_of_messages = 1, visibility_timeout = nil) ⇒ Object
Retrieves a list of messages from queue. Returns an array of hashes in format: {:id=>'message_id', body=>'message_body'}
sqs.receive_messages('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue',10, 5) #=>
[{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}, ..., {}]
P.S. Usually returns fewer messages than requested even if they are available.
362 363 364 365 366 367 368 369 370 371 |
# File 'lib/sqs/right_sqs_interface.rb', line 362 def (queue_url, =1, visibility_timeout=nil) return [] if == 0 req_hash = generate_rest_request('GET', 'NumberOfMessages' => , 'VisibilityTimeout' => visibility_timeout, :queue_url => "#{queue_url}/front" ) request_info(req_hash, SqsReceiveMessagesParser.new(:logger => @logger)) rescue on_exception end |
#remove_grant(queue_url, grantee_email_address_or_id, permission = nil) ⇒ Object
Revokes permission from user. Returns true
or an exception.
sqs.remove_grant('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '[email protected]', 'FULLCONTROL') #=> true
344 345 346 347 348 349 350 351 352 353 |
# File 'lib/sqs/right_sqs_interface.rb', line 344 def remove_grant(queue_url, grantee_email_address_or_id, = nil) grantee_key = grantee_email_address_or_id.include?('@') ? 'Grantee.EmailAddress' : 'Grantee.ID' req_hash = generate_request('RemoveGrant', grantee_key => grantee_email_address_or_id, 'Permission' => , :queue_url => queue_url) request_info(req_hash, SqsStatusParser.new(:logger => @logger)) rescue on_exception end |
#request_info(request, parser) ⇒ Object
Sends request to Amazon and parses the response Raises AwsError if any banana happened
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/sqs/right_sqs_interface.rb', line 173 def request_info(request, parser) # :nodoc: thread = @params[:multi_thread] ? Thread.current : Thread.main thread[:sqs_connection] ||= Rightscale::HttpConnection.new(:exception => AwsError) @last_request = request[:request] @last_response = nil response=nil @@bench_sqs.add!{ response = thread[:sqs_connection].request(request) } # check response for errors... @last_response = response if response.is_a?(Net::HTTPSuccess) @error_handler = nil @@bench_xml.add! { parser.parse(response) } return parser.result else @error_handler = AWSErrorHandler.new(self, parser, @@amazon_problems) unless @error_handler check_result = @error_handler.check(request) if check_result @error_handler = nil return check_result end raise AwsError.new(@last_errors, @last_response.code, @last_request_id) end rescue @error_handler = nil raise end |
#send_message(queue_url, message) ⇒ Object Also known as: push_message
Sends new message to queue.Returns ‘message_id’ or raises an exception.
sqs.('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 'message_1') #=> "1234567890...0987654321"
390 391 392 393 394 395 396 397 |
# File 'lib/sqs/right_sqs_interface.rb', line 390 def (queue_url, ) req_hash = generate_rest_request('PUT', :message => , :queue_url => "#{queue_url}/back") request_info(req_hash, SqsSendMessagesParser.new(:logger => @logger)) rescue on_exception end |
#set_queue_attributes(queue_url, attribute, value) ⇒ Object
Sets queue attribute. Returns true
or an exception.
sqs.set_queue_attributes('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', "VisibilityTimeout", 10) #=> true
P.S. Amazon returns success even if the attribute does not exist. Also, attribute values may not be immediately available to other queries for some time after an update (see the SQS documentation for semantics).
262 263 264 265 266 267 268 269 270 |
# File 'lib/sqs/right_sqs_interface.rb', line 262 def set_queue_attributes(queue_url, attribute, value) req_hash = generate_request('SetQueueAttributes', 'Attribute' => attribute, 'Value' => value, :queue_url => queue_url) request_info(req_hash, SqsStatusParser.new(:logger => @logger)) rescue on_exception end |
#set_visibility_timeout(queue_url, visibility_timeout = nil) ⇒ Object
Sets visibility timeout. Returns true
or an exception.
sqs.set_visibility_timeout('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 15) #=> true
See also: set_queue_attributes
278 279 280 281 282 283 284 285 |
# File 'lib/sqs/right_sqs_interface.rb', line 278 def set_visibility_timeout(queue_url, visibility_timeout=nil) req_hash = generate_request('SetVisibilityTimeout', 'VisibilityTimeout' => visibility_timeout || DEFAULT_VISIBILITY_TIMEOUT, :queue_url => queue_url ) request_info(req_hash, SqsStatusParser.new(:logger => @logger)) rescue on_exception end |