Class: Gcloud::Pubsub::Connection
- Inherits:
-
Object
- Object
- Gcloud::Pubsub::Connection
- Defined in:
- lib/gcloud/pubsub/connection.rb
Overview
Represents the connection to Pub/Sub, as well as expose the API calls.
Constant Summary collapse
- API_VERSION =
:nodoc:
"v1"
Instance Attribute Summary collapse
-
#credentials ⇒ Object
:nodoc:.
-
#project ⇒ Object
Returns the value of attribute project.
Instance Method Summary collapse
-
#acknowledge(subscription, *ack_ids) ⇒ Object
Acknowledges receipt of a message.
-
#create_subscription(topic, subscription_name, options = {}) ⇒ Object
Creates a subscription on a given topic for a given subscriber.
-
#create_topic(topic_name, options = {}) ⇒ Object
Creates the given topic with the given name.
-
#delete_subscription(subscription) ⇒ Object
Deletes an existing subscription.
-
#delete_topic(topic) ⇒ Object
Deletes the topic with the given name.
-
#get_subscription(subscription_name, options = {}) ⇒ Object
Gets the details of a subscription.
- #get_subscription_policy(subscription_name, options = {}) ⇒ Object
-
#get_topic(topic_name, options = {}) ⇒ Object
Gets the configuration of a topic.
- #get_topic_policy(topic_name, options = {}) ⇒ Object
-
#initialize(project, credentials) ⇒ Connection
constructor
Creates a new Connection instance.
-
#list_subscriptions(options = {}) ⇒ Object
Lists matching subscriptions by project.
-
#list_topics(options = {}) ⇒ Object
Lists matching topics.
-
#list_topics_subscriptions(topic, options = {}) ⇒ Object
Lists matching subscriptions by project and topic.
-
#modify_ack_deadline(subscription, ids, deadline) ⇒ Object
Modifies the ack deadline for a specific message.
-
#modify_push_config(subscription, endpoint, attributes) ⇒ Object
Modifies the PushConfig for a specified subscription.
- #project_path(options = {}) ⇒ Object
-
#publish(topic, messages) ⇒ Object
Adds one or more messages to the topic.
-
#pull(subscription, options = {}) ⇒ Object
Pulls a single message from the server.
- #set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object
- #set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object
- #subscription_path(subscription_name, options = {}) ⇒ Object
- #topic_path(topic_name, options = {}) ⇒ Object
Constructor Details
#initialize(project, credentials) ⇒ Connection
Creates a new Connection instance.
32 33 34 35 36 37 38 39 |
# File 'lib/gcloud/pubsub/connection.rb', line 32 def initialize project, credentials @project = project @credentials = credentials @client = Google::APIClient.new application_name: "gcloud-ruby", application_version: Gcloud::VERSION @client. = @credentials.client @pubsub = @client.discovered_api "pubsub", API_VERSION end |
Instance Attribute Details
#credentials ⇒ Object
:nodoc:
28 29 30 |
# File 'lib/gcloud/pubsub/connection.rb', line 28 def credentials @credentials end |
#project ⇒ Object
Returns the value of attribute project.
27 28 29 |
# File 'lib/gcloud/pubsub/connection.rb', line 27 def project @project end |
Instance Method Details
#acknowledge(subscription, *ack_ids) ⇒ Object
Acknowledges receipt of a message.
211 212 213 214 215 216 217 |
# File 'lib/gcloud/pubsub/connection.rb', line 211 def acknowledge subscription, *ack_ids @client.execute( api_method: @pubsub.projects.subscriptions.acknowledge, parameters: { subscription: subscription }, body_object: { ackIds: ack_ids } ) end |
#create_subscription(topic, subscription_name, options = {}) ⇒ Object
Creates a subscription on a given topic for a given subscriber.
106 107 108 109 110 111 112 113 |
# File 'lib/gcloud/pubsub/connection.rb', line 106 def create_subscription topic, subscription_name, = {} data = subscription_data topic, @client.execute( api_method: @pubsub.projects.subscriptions.create, parameters: { name: subscription_path(subscription_name, ) }, body_object: data ) end |
#create_topic(topic_name, options = {}) ⇒ Object
Creates the given topic with the given name.
56 57 58 59 60 61 |
# File 'lib/gcloud/pubsub/connection.rb', line 56 def create_topic topic_name, = {} @client.execute( api_method: @pubsub.projects.topics.create, parameters: { name: topic_path(topic_name, ) } ) end |
#delete_subscription(subscription) ⇒ Object
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
156 157 158 159 160 161 |
# File 'lib/gcloud/pubsub/connection.rb', line 156 def delete_subscription subscription @client.execute( api_method: @pubsub.projects.subscriptions.delete, parameters: { subscription: subscription } ) end |
#delete_topic(topic) ⇒ Object
Deletes the topic with the given name. All subscriptions to this topic are also deleted. Returns NOT_FOUND if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.
82 83 84 85 86 87 |
# File 'lib/gcloud/pubsub/connection.rb', line 82 def delete_topic topic @client.execute( api_method: @pubsub.projects.topics.delete, parameters: { topic: topic } ) end |
#get_subscription(subscription_name, options = {}) ⇒ Object
Gets the details of a subscription.
117 118 119 120 121 122 123 |
# File 'lib/gcloud/pubsub/connection.rb', line 117 def get_subscription subscription_name, = {} @client.execute( api_method: @pubsub.projects.subscriptions.get, parameters: { subscription: subscription_path(subscription_name, ) } ) end |
#get_subscription_policy(subscription_name, options = {}) ⇒ Object
163 164 165 166 167 168 169 |
# File 'lib/gcloud/pubsub/connection.rb', line 163 def get_subscription_policy subscription_name, = {} @client.execute( api_method: @pubsub.projects.subscriptions.get_iam_policy, parameters: { resource: subscription_path(subscription_name, ) } ) end |
#get_topic(topic_name, options = {}) ⇒ Object
Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.
47 48 49 50 51 52 |
# File 'lib/gcloud/pubsub/connection.rb', line 47 def get_topic topic_name, = {} @client.execute( api_method: @pubsub.projects.topics.get, parameters: { topic: topic_path(topic_name, ) } ) end |
#get_topic_policy(topic_name, options = {}) ⇒ Object
89 90 91 92 93 94 |
# File 'lib/gcloud/pubsub/connection.rb', line 89 def get_topic_policy topic_name, = {} @client.execute( api_method: @pubsub.projects.topics.get_iam_policy, parameters: { resource: topic_path(topic_name, ) } ) end |
#list_subscriptions(options = {}) ⇒ Object
Lists matching subscriptions by project.
127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/gcloud/pubsub/connection.rb', line 127 def list_subscriptions = {} params = { project: project_path(), pageToken: .delete(:token), pageSize: .delete(:max) }.delete_if { |_, v| v.nil? } @client.execute( api_method: @pubsub.projects.subscriptions.list, parameters: params ) end |
#list_topics(options = {}) ⇒ Object
Lists matching topics.
65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/gcloud/pubsub/connection.rb', line 65 def list_topics = {} params = { project: project_path(), pageToken: .delete(:token), pageSize: .delete(:max) }.delete_if { |_, v| v.nil? } @client.execute( api_method: @pubsub.projects.topics.list, parameters: params ) end |
#list_topics_subscriptions(topic, options = {}) ⇒ Object
Lists matching subscriptions by project and topic.
141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/gcloud/pubsub/connection.rb', line 141 def list_topics_subscriptions topic, = {} params = { topic: topic, pageToken: .delete(:token), pageSize: .delete(:max) }.delete_if { |_, v| v.nil? } @client.execute( api_method: @pubsub.projects.topics.subscriptions.list, parameters: params ) end |
#modify_ack_deadline(subscription, ids, deadline) ⇒ Object
Modifies the ack deadline for a specific message.
232 233 234 235 236 237 238 239 |
# File 'lib/gcloud/pubsub/connection.rb', line 232 def modify_ack_deadline subscription, ids, deadline ids = Array ids @client.execute( api_method: @pubsub.projects.subscriptions.modify_ack_deadline, parameters: { subscription: subscription }, body_object: { ackIds: ids, ackDeadlineSeconds: deadline } ) end |
#modify_push_config(subscription, endpoint, attributes) ⇒ Object
Modifies the PushConfig for a specified subscription.
221 222 223 224 225 226 227 228 |
# File 'lib/gcloud/pubsub/connection.rb', line 221 def modify_push_config subscription, endpoint, attributes @client.execute( api_method: @pubsub.projects.subscriptions.modify_push_config, parameters: { subscription: subscription }, body_object: { pushConfig: { pushEndpoint: endpoint, attributes: attributes } } ) end |
#project_path(options = {}) ⇒ Object
241 242 243 244 |
# File 'lib/gcloud/pubsub/connection.rb', line 241 def project_path = {} project_name = [:project] || project "projects/#{project_name}" end |
#publish(topic, messages) ⇒ Object
Adds one or more messages to the topic. Returns NOT_FOUND if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.
185 186 187 188 189 190 191 192 193 194 |
# File 'lib/gcloud/pubsub/connection.rb', line 185 def publish topic, gapi_msgs = .map do |data, attributes| { data: [data].pack("m"), attributes: attributes } end @client.execute( api_method: @pubsub.projects.topics.publish, parameters: { topic: topic }, body_object: { messages: gapi_msgs } ) end |
#pull(subscription, options = {}) ⇒ Object
Pulls a single message from the server.
198 199 200 201 202 203 204 205 206 207 |
# File 'lib/gcloud/pubsub/connection.rb', line 198 def pull subscription, = {} body = { returnImmediately: !(!.fetch(:immediate, true)), maxMessages: .fetch(:max, 100).to_i } @client.execute( api_method: @pubsub.projects.subscriptions.pull, parameters: { subscription: subscription }, body_object: body ) end |
#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object
171 172 173 174 175 176 177 178 |
# File 'lib/gcloud/pubsub/connection.rb', line 171 def set_subscription_policy subscription_name, new_policy, = {} @client.execute( api_method: @pubsub.projects.subscriptions.set_iam_policy, parameters: { resource: subscription_path(subscription_name, ) }, body_object: { policy: new_policy } ) end |
#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object
96 97 98 99 100 101 102 |
# File 'lib/gcloud/pubsub/connection.rb', line 96 def set_topic_policy topic_name, new_policy, = {} @client.execute( api_method: @pubsub.projects.topics.set_iam_policy, parameters: { resource: topic_path(topic_name, ) }, body_object: { policy: new_policy } ) end |
#subscription_path(subscription_name, options = {}) ⇒ Object
251 252 253 254 |
# File 'lib/gcloud/pubsub/connection.rb', line 251 def subscription_path subscription_name, = {} return subscription_name if subscription_name.to_s.include? "/" "#{project_path()}/subscriptions/#{subscription_name}" end |
#topic_path(topic_name, options = {}) ⇒ Object
246 247 248 249 |
# File 'lib/gcloud/pubsub/connection.rb', line 246 def topic_path topic_name, = {} return topic_name if topic_name.to_s.include? "/" "#{project_path()}/topics/#{topic_name}" end |