Class: RightAws::SnsInterface

Inherits:
RightAwsBase show all
Includes:
RightAwsBaseInterface
Defined in:
lib/sns/right_sns_interface.rb

Overview

Right::Aws::SnsInterface - RightScale’s low-level Amazon SNS interface

Defined Under Namespace

Classes: SnsCreateTopicParser, SnsGetTopicAttributesParser, SnsListSubscriptionsByTopicParser, SnsListTopicsParser, SnsStatusParser, SnsSubscribeParser

Constant Summary collapse

API_VERSION =
"2010-03-31"
DEFAULT_HOST =
"sns.us-east-1.amazonaws.com"
DEFAULT_PORT =
443
DEFAULT_PROTOCOL =
'https'
REQUEST_TTL =
30
DEFAULT_VISIBILITY_TIMEOUT =
30
@@bench =
AwsBenchmarkingBlock.new
@@api =
API_VERSION

Constants included from RightAwsBaseInterface

RightAwsBaseInterface::BLOCK_DEVICE_KEY_MAPPING, RightAwsBaseInterface::DEFAULT_SIGNATURE_VERSION

Constants inherited from RightAwsBase

RightAwsBase::AMAZON_PROBLEMS, RightAwsBase::RAISE_ON_TIMEOUT_ON_ACTIONS

Instance Attribute Summary

Attributes included from RightAwsBaseInterface

#aws_access_key_id, #aws_secret_access_key, #cache, #connection, #last_errors, #last_request, #last_request_id, #last_response, #logger, #params, #signature_version

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RightAwsBaseInterface

#amazonize_block_device_mappings, #amazonize_list, #cache_hits?, caching, caching=, #caching?, #destroy_connection, #generate_request_impl, #get_connection, #get_connections_storage, #get_server_url, #incrementally_list_items, #init, #on_exception, #request_cache_or_info, #request_info_impl, #signed_service_params, #update_cache, #with_connection_options

Methods inherited from RightAwsBase

amazon_problems, amazon_problems=, raise_on_timeout_on_actions, raise_on_timeout_on_actions=

Constructor Details

#initialize(aws_access_key_id = nil, aws_secret_access_key = nil, params = {}) ⇒ SnsInterface

Creates a new SnsInterface instance. This instance is limited to operations on SNS objects created with Amazon’s 2008-01-01 API version. This interface will not work on objects created with prior API versions. See Amazon’s article “Migrating to Amazon SNS API version 2008-01-01” at: developer.amazonwebservices.com/connect/entry.jspa?externalID=1148

sqs = RightAws::SnsInterface.new('1E3GDYEOGFJPIT75KDT40','hgTHt68JY07JKUY08ftHYtERkjgtfERn57DFE379', {:multi_thread => true, :logger => Logger.new('/tmp/x.log')})

Params is a hash:

{:server       => 'sns.us-east-1.amazonaws.com' # Amazon service host: 'sns.us-east-1.amazonaws.com' (default)
 :port         => 443                   # Amazon service port: 80 or 443 (default)
 :multi_thread => true|false            # Multi-threaded (connection per each thread): true or false (default)
 :signature_version => '0'              # The signature version : '0', '1' or '2'(default)
 :logger       => Logger Object}        # Logger instance: logs to STDOUT if omitted }


70
71
72
73
74
75
76
77
78
# File 'lib/sns/right_sns_interface.rb', line 70

def initialize(aws_access_key_id=nil, aws_secret_access_key=nil, params={})
  init({ :name             => 'SNS', 
         :default_host     => ENV['SNS_URL'] ? URI.parse(ENV['SNS_URL']).host   : DEFAULT_HOST, 
         :default_port     => ENV['SNS_URL'] ? URI.parse(ENV['SNS_URL']).port   : DEFAULT_PORT, 
         :default_protocol => ENV['SNS_URL'] ? URI.parse(ENV['SNS_URL']).scheme : DEFAULT_PROTOCOL }, 
       aws_access_key_id     || ENV['AWS_ACCESS_KEY_ID'], 
       aws_secret_access_key || ENV['AWS_SECRET_ACCESS_KEY'], 
       params)
end

Class Method Details

.apiObject



50
51
52
# File 'lib/sns/right_sns_interface.rb', line 50

def self.api 
  @@api
end

.bench_snsObject



45
46
47
# File 'lib/sns/right_sns_interface.rb', line 45

def self.bench_sns
  @@bench.service
end

.bench_xmlObject



42
43
44
# File 'lib/sns/right_sns_interface.rb', line 42

def self.bench_xml
  @@bench.xml
end

Instance Method Details

#add_permissions(topic_arn, label, grantees, actions) ⇒ Object

Add permissions to a topic.

sns.add_permissions('arn:aws:sns:us-east-1:464646271962:test',
                   'testLabel', ['125074342641','125074342642'],
                   ['Publish','Subscribe']) #=> true

+permissions+ is a hash of: AccountId => ActionName
(valid ActionNames: * | Publish | Subscribe | Unsubscribe | GetTopicAttributes | SetTopicAttributes | ConfirmSubscription )


235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/sns/right_sns_interface.rb', line 235

def add_permissions(topic_arn, label, grantees, actions)
  params = {}
  # add each member
  grantees.each_with_index { |awsid,i| params.merge!("AWSAccountId.member.#{i + 1}" => awsid) }
  # add each action
  actions.each_with_index  { |action,i| params.merge!("ActionName.member.#{i + 1}" => action) }
  # standard params
  params.merge!('Label'    => label,
                'TopicArn' => topic_arn )
  req_hash = generate_request('AddPermission', params)
  request_info(req_hash, SnsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#create_topic(sns, topic_name) ⇒ Object

Creates a new topic, returning the new Topic object.

sqs.create_topic('my_awesome_queue') #=> 'https://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue'


142
143
144
145
146
147
148
# File 'lib/sns/right_sns_interface.rb', line 142

def create_topic(sns, topic_name)
  req_hash = generate_request('CreateTopic', 'Name' => topic_name)
  arn = request_info(req_hash, SnsCreateTopicParser.new(:logger => @logger))
  Sns::Topic.new(sns, arn)
rescue
  on_exception
end

#delete_topic(topic_arn) ⇒ Object

Deletes topic. Any messages in the topic are permanently lost. Returns true or an exception. Deletion is eventual.

sqs.delete_queue('arn:aws:sns:us-east-1:464646271962:test') #=> true


168
169
170
171
172
173
# File 'lib/sns/right_sns_interface.rb', line 168

def delete_topic(topic_arn)
  req_hash = generate_request('DeleteTopic', 'TopicArn' => topic_arn)
  request_info(req_hash, SnsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#generate_post_request(action, param = {}) ⇒ Object

:nodoc:



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/sns/right_sns_interface.rb', line 109

def generate_post_request(action, param={})  # :nodoc:
  service = param[:queue_url] ? URI(param[:queue_url]).path : '/'
  message   = param[:message]                # extract message body if nesessary
  param.each{ |key, value| param.delete(key) if (value.nil? || key.is_a?(Symbol)) }
  service_hash = { "Action"           => action,
                   "Expires"          => (Time.now + REQUEST_TTL).utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
                   "AWSAccessKeyId"   => @aws_access_key_id,
                   "MessageBody"      => message,
                   "Version"          => API_VERSION }
  service_hash.update(param)
  #
  service_params = signed_service_params(@aws_secret_access_key, service_hash, :post, @params[:server], service)
  request        = Net::HTTP::Post.new(AwsUtils::URLencode(service))
  request['Content-Type'] = 'application/x-www-form-urlencoded' 
  request.body = service_params
    # prepare output hash
  { :request  => request, 
    :server   => @params[:server],
    :port     => @params[:port],
    :protocol => @params[:protocol] }
end

#generate_request(action, param = {}) ⇒ Object

Generates a request hash for the query API



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/sns/right_sns_interface.rb', line 86

def generate_request(action, param={})  # :nodoc:
  # For operation requests on a topic, the queue URI will be a parameter,
  # so we first extract it from the call parameters.  Next we remove any
  # parameters with no value or with symbolic keys.  We add the header
  # fields required in all requests, and then the headers passed in as
  # params.  We sort the header fields alphabetically and then generate the
  # signature before URL escaping the resulting query and sending it.
  service = param[:queue_url] ? URI(param[:queue_url]).path : '/'
  param.each{ |key, value| param.delete(key) if (value.nil? || key.is_a?(Symbol)) }
  service_hash = { "Action"           => action,
                   "Expires"          => (Time.now + REQUEST_TTL).utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
                   "AWSAccessKeyId"   => @aws_access_key_id,
                   "Version"          => API_VERSION }
  service_hash.update(param)
  service_params = signed_service_params(@aws_secret_access_key, service_hash, :get, @params[:server], service)
  request        = Net::HTTP::Get.new("#{AwsUtils.URLencode(service)}?#{service_params}")
    # prepare output hash
  { :request  => request, 
    :server   => @params[:server],
    :port     => @params[:port],
    :protocol => @params[:protocol] }
end

#get_topic_attributes(topic_arn) ⇒ Object

Retrieves the topic attribute(s). Returns a hash of attribute(s) or an exception.



195
196
197
198
199
200
# File 'lib/sns/right_sns_interface.rb', line 195

def get_topic_attributes(topic_arn)
  req_hash = generate_request('GetTopicAttributes', 'TopicArn' => topic_arn)
  request_info(req_hash, SnsGetTopicAttributesParser.new(:logger => @logger))
rescue
  on_exception
end

#list_subscriptions_by_topic(topic_arn, next_token = nil) ⇒ Object

Retrieves the topic subscribers(s). Returns a hash containing a :set of members and the :next token



203
204
205
206
207
208
209
210
# File 'lib/sns/right_sns_interface.rb', line 203

def list_subscriptions_by_topic(topic_arn, next_token = nil)
  params = { 'TopicArn' => topic_arn }
  params.merge!({ 'NextToken' => next_token }) unless !next_token
  req_hash = generate_request('ListSubscriptionsByTopic', params)
  request_info(req_hash, SnsListSubscriptionsByTopicParser.new(:logger => @logger))
rescue
  on_exception
end

#list_topicsObject

Lists all topics owned by this user Topic creation is an eventual operation and created topics may not show up in immediately subsequent list_topic calls.

sns.list_queues() #=> ['ZZ7XXXYYYBINS','ZZ7XXXYYYBINS']


155
156
157
158
159
160
# File 'lib/sns/right_sns_interface.rb', line 155

def list_topics
  req_hash = generate_request('ListTopics')
  request_info(req_hash, SnsListTopicsParser.new(:logger => @logger))
rescue
  on_exception
end

#remove_permissions(topic_arn, label) ⇒ Object

Revoke any permissions in the topic policy that matches the label parameter.

sns.remove_permissions('arn:aws:sns:us-east-1:464646271962:test',
                      'testLabel') # => true


255
256
257
258
259
260
261
262
# File 'lib/sns/right_sns_interface.rb', line 255

def remove_permissions(topic_arn, label)
  req_hash = generate_request('RemovePermission',
                              'Label'    => label,
                              'TopicArn' => topic_arn )
  request_info(req_hash, SnsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#request_info(request, parser) ⇒ Object

Sends request to Amazon and parses the response Raises AwsError if any banana happened



134
135
136
# File 'lib/sns/right_sns_interface.rb', line 134

def request_info(request, parser) # :nodoc:
  request_info_impl(:sqs_connection, @@bench, request, parser)
end

#send_message(topic_arn, body, subject = nil) ⇒ Object Also known as: push_message

Sends a new message to a topic. Body size is limited to 8 KB. If successful, this call returns true

sns.send_message('arn:aws:sns:us-east-1:464646271962:test', 'body', 'message 1') #=> true

On failure, send_message raises an exception.



182
183
184
185
186
187
188
189
# File 'lib/sns/right_sns_interface.rb', line 182

def send_message(topic_arn, body, subject = nil)
  params = { 'TopicArn' => topic_arn, 'Message' => body }
  params.merge!({ 'Subject' => subject }) unless !subject || subject.length == 0
  req_hash = generate_post_request('Publish', params)
  request_info(req_hash, SnsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#set_topic_attributes(topic_arn, attribute, value) ⇒ Object

Sets a topic attribute. Returns true or an exception.

sns.set_queue_attributes('arn:aws:sns:us-east-1:464646271962:test', "DisplayName", "Wendy's Widgets") #=> true


268
269
270
271
272
273
274
275
276
# File 'lib/sns/right_sns_interface.rb', line 268

def set_topic_attributes(topic_arn, attribute, value)
  req_hash = generate_request('SetTopicAttributes', 
                              'AttributeName'  => attribute,
                              'AttributeValue' => value,
                              'TopicArn'       => topic_arn)
  request_info(req_hash, SnsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#subscribe_to_topic(topic_arn, protocol, end_point) ⇒ Object

Subscribe a new endpoint to a topic



213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/sns/right_sns_interface.rb', line 213

def subscribe_to_topic(topic_arn, protocol, end_point)
  req_hash = generate_request('Subscribe', 'TopicArn' => topic_arn, 'Protocol' => protocol, 'Endpoint' => end_point)
  token = request_info(req_hash, SnsSubscribeParser.new(:logger => @logger))
  if token == 'pending confirmation'
    true
  else
    member               = Sns::Member.new
    member.topic_arn     = topic_arn
    member.token         = token
    member.sns_interface = self
    member
  end
end