Class: Gcloud::Pubsub::Service
- Inherits:
-
Object
- Object
- Gcloud::Pubsub::Service
- Defined in:
- lib/gcloud/pubsub/service.rb
Overview
methods.
Instance Attribute Summary collapse
-
#credentials ⇒ Object
Returns the value of attribute credentials.
-
#host ⇒ Object
Returns the value of attribute host.
-
#mocked_iam ⇒ Object
Returns the value of attribute mocked_iam.
-
#mocked_publisher ⇒ Object
Returns the value of attribute mocked_publisher.
-
#mocked_subscriber ⇒ Object
Returns the value of attribute mocked_subscriber.
-
#project ⇒ Object
Returns the value of attribute project.
-
#retries ⇒ Object
Returns the value of attribute retries.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Instance Method Summary collapse
-
#acknowledge(subscription, *ack_ids) ⇒ Object
Acknowledges receipt of a message.
-
#create_subscription(topic, subscription_name, options = {}) ⇒ Object
Creates a subscription on a given topic for a given subscriber.
-
#create_topic(topic_name, options = {}) ⇒ Object
Creates the given topic with the given name.
- #creds ⇒ Object
-
#delete_subscription(subscription) ⇒ Object
Deletes an existing subscription.
-
#delete_topic(topic_name) ⇒ Object
Deletes the topic with the given name.
-
#get_subscription(subscription_name, options = {}) ⇒ Object
Gets the details of a subscription.
- #get_subscription_policy(subscription_name, options = {}) ⇒ Object
-
#get_topic(topic_name, options = {}) ⇒ Object
Gets the configuration of a topic.
- #get_topic_policy(topic_name, options = {}) ⇒ Object
- #iam ⇒ Object
-
#initialize(project, credentials, host: nil, retries: nil, timeout: nil) ⇒ Service
constructor
Creates a new Service instance.
- #insecure? ⇒ Boolean
- #inspect ⇒ Object
-
#list_subscriptions(options = {}) ⇒ Object
Lists matching subscriptions by project.
-
#list_topics(options = {}) ⇒ Object
Lists matching topics.
-
#list_topics_subscriptions(topic, options = {}) ⇒ Object
Lists matching subscriptions by project and topic.
-
#modify_ack_deadline(subscription, ids, deadline) ⇒ Object
Modifies the ack deadline for a specific message.
-
#modify_push_config(subscription, endpoint, attributes) ⇒ Object
Modifies the PushConfig for a specified subscription.
- #project_path(options = {}) ⇒ Object
-
#publish(topic, messages) ⇒ Object
Adds one or more messages to the topic.
- #publisher ⇒ Object
-
#pull(subscription, options = {}) ⇒ Object
Pulls a single message from the server.
- #set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object
- #set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object
- #subscriber ⇒ Object
- #subscription_path(subscription_name, options = {}) ⇒ Object
- #test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object
- #test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object
- #topic_path(topic_name, options = {}) ⇒ Object
Constructor Details
#initialize(project, credentials, host: nil, retries: nil, timeout: nil) ⇒ Service
Creates a new Service instance.
33 34 35 36 37 38 39 |
# File 'lib/gcloud/pubsub/service.rb', line 33 def initialize project, credentials, host: nil, retries: nil, timeout: nil @project = project @credentials = credentials @host = host || "pubsub.googleapis.com" @retries = retries @timeout = timeout end |
Instance Attribute Details
#credentials ⇒ Object
Returns the value of attribute credentials.
29 30 31 |
# File 'lib/gcloud/pubsub/service.rb', line 29 def credentials @credentials end |
#host ⇒ Object
Returns the value of attribute host.
29 30 31 |
# File 'lib/gcloud/pubsub/service.rb', line 29 def host @host end |
#mocked_iam ⇒ Object
Returns the value of attribute mocked_iam.
66 67 68 |
# File 'lib/gcloud/pubsub/service.rb', line 66 def mocked_iam @mocked_iam end |
#mocked_publisher ⇒ Object
Returns the value of attribute mocked_publisher.
59 60 61 |
# File 'lib/gcloud/pubsub/service.rb', line 59 def mocked_publisher @mocked_publisher end |
#mocked_subscriber ⇒ Object
Returns the value of attribute mocked_subscriber.
52 53 54 |
# File 'lib/gcloud/pubsub/service.rb', line 52 def mocked_subscriber @mocked_subscriber end |
#project ⇒ Object
Returns the value of attribute project.
29 30 31 |
# File 'lib/gcloud/pubsub/service.rb', line 29 def project @project end |
#retries ⇒ Object
Returns the value of attribute retries.
29 30 31 |
# File 'lib/gcloud/pubsub/service.rb', line 29 def retries @retries end |
#timeout ⇒ Object
Returns the value of attribute timeout.
29 30 31 |
# File 'lib/gcloud/pubsub/service.rb', line 29 def timeout @timeout end |
Instance Method Details
#acknowledge(subscription, *ack_ids) ⇒ Object
Acknowledges receipt of a message.
213 214 215 216 217 218 219 220 |
# File 'lib/gcloud/pubsub/service.rb', line 213 def acknowledge subscription, *ack_ids ack_req = Google::Pubsub::V1::AcknowledgeRequest.new( subscription: subscription_path(subscription), ack_ids: ack_ids ) execute { subscriber.acknowledge ack_req } end |
#create_subscription(topic, subscription_name, options = {}) ⇒ Object
Creates a subscription on a given topic for a given subscriber.
173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/gcloud/pubsub/service.rb', line 173 def create_subscription topic, subscription_name, = {} sub_params = { name: subscription_path(subscription_name, ), topic: topic_path(topic), ack_deadline_seconds: [:deadline] }.delete_if { |_, v| v.nil? } sub_req = Google::Pubsub::V1::Subscription.new sub_params if [:endpoint] sub_req.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: [:endpoint], attributes: ([:attributes] || {}).to_h) end execute { subscriber.create_subscription sub_req } end |
#create_topic(topic_name, options = {}) ⇒ Object
Creates the given topic with the given name.
100 101 102 103 104 105 106 |
# File 'lib/gcloud/pubsub/service.rb', line 100 def create_topic topic_name, = {} topic_req = Google::Pubsub::V1::Topic.new.tap do |r| r.name = topic_path(topic_name, ) end execute { publisher.create_topic topic_req } end |
#creds ⇒ Object
41 42 43 44 45 |
# File 'lib/gcloud/pubsub/service.rb', line 41 def creds return credentials if insecure? GRPC::Core::ChannelCredentials.new.compose \ GRPC::Core::CallCredentials.new credentials.client.updater_proc end |
#delete_subscription(subscription) ⇒ Object
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
191 192 193 194 195 196 197 |
# File 'lib/gcloud/pubsub/service.rb', line 191 def delete_subscription subscription del_req = Google::Pubsub::V1::DeleteSubscriptionRequest.new( subscription: subscription_path(subscription) ) execute { subscriber.delete_subscription del_req } end |
#delete_topic(topic_name) ⇒ Object
Deletes the topic with the given name. All subscriptions to this topic are also deleted. Raises GRPC status code 5 if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.
113 114 115 116 117 118 119 |
# File 'lib/gcloud/pubsub/service.rb', line 113 def delete_topic topic_name topic_req = Google::Pubsub::V1::DeleteTopicRequest.new.tap do |r| r.topic = topic_path(topic_name) end execute { publisher.delete_topic topic_req } end |
#get_subscription(subscription_name, options = {}) ⇒ Object
Gets the details of a subscription.
140 141 142 143 144 145 146 |
# File 'lib/gcloud/pubsub/service.rb', line 140 def get_subscription subscription_name, = {} sub_req = Google::Pubsub::V1::GetSubscriptionRequest.new( subscription: subscription_path(subscription_name, ) ) execute { subscriber.get_subscription sub_req } end |
#get_subscription_policy(subscription_name, options = {}) ⇒ Object
277 278 279 280 281 282 283 |
# File 'lib/gcloud/pubsub/service.rb', line 277 def get_subscription_policy subscription_name, = {} get_req = Google::Iam::V1::GetIamPolicyRequest.new( resource: subscription_path(subscription_name, ) ) execute { iam.get_iam_policy get_req } end |
#get_topic(topic_name, options = {}) ⇒ Object
Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.
78 79 80 81 82 83 84 |
# File 'lib/gcloud/pubsub/service.rb', line 78 def get_topic topic_name, = {} topic_req = Google::Pubsub::V1::GetTopicRequest.new.tap do |r| r.topic = topic_path(topic_name, ) end execute { publisher.get_topic topic_req } end |
#get_topic_policy(topic_name, options = {}) ⇒ Object
251 252 253 254 255 256 257 |
# File 'lib/gcloud/pubsub/service.rb', line 251 def get_topic_policy topic_name, = {} get_req = Google::Iam::V1::GetIamPolicyRequest.new( resource: topic_path(topic_name, ) ) execute { iam.get_iam_policy get_req } end |
#iam ⇒ Object
61 62 63 64 65 |
# File 'lib/gcloud/pubsub/service.rb', line 61 def iam return mocked_iam if mocked_iam @iam ||= Google::Iam::V1::IAMPolicy::Stub.new( host, creds, timeout: timeout) end |
#insecure? ⇒ Boolean
68 69 70 |
# File 'lib/gcloud/pubsub/service.rb', line 68 def insecure? credentials == :this_channel_is_insecure end |
#inspect ⇒ Object
319 320 321 |
# File 'lib/gcloud/pubsub/service.rb', line 319 def inspect "#{self.class}(#{@project})" end |
#list_subscriptions(options = {}) ⇒ Object
Lists matching subscriptions by project.
162 163 164 165 166 167 168 169 |
# File 'lib/gcloud/pubsub/service.rb', line 162 def list_subscriptions = {} list_params = { project: project_path(), page_token: [:token], page_size: [:max] }.delete_if { |_, v| v.nil? } list_req = Google::Pubsub::V1::ListSubscriptionsRequest.new list_params execute { subscriber.list_subscriptions list_req } end |
#list_topics(options = {}) ⇒ Object
Lists matching topics.
88 89 90 91 92 93 94 95 96 |
# File 'lib/gcloud/pubsub/service.rb', line 88 def list_topics = {} topics_req = Google::Pubsub::V1::ListTopicsRequest.new.tap do |r| r.project = project_path() r.page_token = [:token] if [:token] r.page_size = [:max] if [:max] end execute { publisher.list_topics topics_req } end |
#list_topics_subscriptions(topic, options = {}) ⇒ Object
Lists matching subscriptions by project and topic.
150 151 152 153 154 155 156 157 158 |
# File 'lib/gcloud/pubsub/service.rb', line 150 def list_topics_subscriptions topic, = {} list_params = { topic: topic_path(topic, ), page_token: [:token], page_size: [:max] }.delete_if { |_, v| v.nil? } list_req = Google::Pubsub::V1::ListTopicSubscriptionsRequest.new \ list_params execute { publisher.list_topic_subscriptions list_req } end |
#modify_ack_deadline(subscription, ids, deadline) ⇒ Object
Modifies the ack deadline for a specific message.
241 242 243 244 245 246 247 248 249 |
# File 'lib/gcloud/pubsub/service.rb', line 241 def modify_ack_deadline subscription, ids, deadline mad_req = Google::Pubsub::V1::ModifyAckDeadlineRequest.new( subscription: subscription_path(subscription), ack_ids: Array(ids), ack_deadline_seconds: deadline ) execute { subscriber.modify_ack_deadline mad_req } end |
#modify_push_config(subscription, endpoint, attributes) ⇒ Object
Modifies the PushConfig for a specified subscription.
224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/gcloud/pubsub/service.rb', line 224 def modify_push_config subscription, endpoint, attributes # Convert attributes to strings to match the protobuf definition attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }] mpc_req = Google::Pubsub::V1::ModifyPushConfigRequest.new( subscription: subscription_path(subscription), push_config: Google::Pubsub::V1::PushConfig.new( push_endpoint: endpoint, attributes: attributes ) ) execute { subscriber.modify_push_config mpc_req } end |
#project_path(options = {}) ⇒ Object
304 305 306 307 |
# File 'lib/gcloud/pubsub/service.rb', line 304 def project_path = {} project_name = [:project] || project "projects/#{project_name}" end |
#publish(topic, messages) ⇒ Object
Adds one or more messages to the topic. Raises GRPC status code 5 if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.
126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/gcloud/pubsub/service.rb', line 126 def publish topic, publish_req = Google::Pubsub::V1::PublishRequest.new( topic: topic_path(topic), messages: .map do |data, attributes| Google::Pubsub::V1::PubsubMessage.new( data: data, attributes: attributes) end ) execute { publisher.publish publish_req } end |
#publisher ⇒ Object
54 55 56 57 58 |
# File 'lib/gcloud/pubsub/service.rb', line 54 def publisher return mocked_publisher if mocked_publisher @publisher ||= Google::Pubsub::V1::Publisher::Stub.new( host, creds, timeout: timeout) end |
#pull(subscription, options = {}) ⇒ Object
Pulls a single message from the server.
201 202 203 204 205 206 207 208 209 |
# File 'lib/gcloud/pubsub/service.rb', line 201 def pull subscription, = {} pull_req = Google::Pubsub::V1::PullRequest.new( subscription: subscription_path(subscription, ), return_immediately: !(!.fetch(:immediate, true)), max_messages: .fetch(:max, 100).to_i ) execute { subscriber.pull pull_req } end |
#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object
285 286 287 288 289 290 291 292 |
# File 'lib/gcloud/pubsub/service.rb', line 285 def set_subscription_policy subscription_name, new_policy, = {} set_req = Google::Iam::V1::SetIamPolicyRequest.new( resource: subscription_path(subscription_name, ), policy: new_policy ) execute { iam.set_iam_policy set_req } end |
#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object
259 260 261 262 263 264 265 266 |
# File 'lib/gcloud/pubsub/service.rb', line 259 def set_topic_policy topic_name, new_policy, = {} set_req = Google::Iam::V1::SetIamPolicyRequest.new( resource: topic_path(topic_name, ), policy: new_policy ) execute { iam.set_iam_policy set_req } end |
#subscriber ⇒ Object
47 48 49 50 51 |
# File 'lib/gcloud/pubsub/service.rb', line 47 def subscriber return mocked_subscriber if mocked_subscriber @subscriber ||= Google::Pubsub::V1::Subscriber::Stub.new( host, creds, timeout: timeout) end |
#subscription_path(subscription_name, options = {}) ⇒ Object
314 315 316 317 |
# File 'lib/gcloud/pubsub/service.rb', line 314 def subscription_path subscription_name, = {} return subscription_name if subscription_name.to_s.include? "/" "#{project_path(options)}/subscriptions/#{subscription_name}" end |
#test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object
294 295 296 297 298 299 300 301 302 |
# File 'lib/gcloud/pubsub/service.rb', line 294 def subscription_name, , = {} test_req = Google::Iam::V1::TestIamPermissionsRequest.new( resource: subscription_path(subscription_name, ), permissions: ) execute { iam. test_req } end |
#test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object
268 269 270 271 272 273 274 275 |
# File 'lib/gcloud/pubsub/service.rb', line 268 def topic_name, , = {} test_req = Google::Iam::V1::TestIamPermissionsRequest.new( resource: topic_path(topic_name, ), permissions: ) execute { iam. test_req } end |
#topic_path(topic_name, options = {}) ⇒ Object
309 310 311 312 |
# File 'lib/gcloud/pubsub/service.rb', line 309 def topic_path topic_name, = {} return topic_name if topic_name.to_s.include? "/" "#{project_path(options)}/topics/#{topic_name}" end |