Class: Google::Cloud::Pubsub::Service
- Inherits:
-
Object
- Object
- Google::Cloud::Pubsub::Service
- Defined in:
- lib/google/cloud/pubsub/service.rb
Overview
methods.
Instance Attribute Summary collapse
-
#credentials ⇒ Object
Returns the value of attribute credentials.
-
#host ⇒ Object
Returns the value of attribute host.
-
#mocked_iam ⇒ Object
Returns the value of attribute mocked_iam.
-
#mocked_publisher ⇒ Object
Returns the value of attribute mocked_publisher.
-
#mocked_subscriber ⇒ Object
Returns the value of attribute mocked_subscriber.
-
#project ⇒ Object
Returns the value of attribute project.
-
#retries ⇒ Object
Returns the value of attribute retries.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Instance Method Summary collapse
-
#acknowledge(subscription, *ack_ids) ⇒ Object
Acknowledges receipt of a message.
-
#create_subscription(topic, subscription_name, options = {}) ⇒ Object
Creates a subscription on a given topic for a given subscriber.
-
#create_topic(topic_name, options = {}) ⇒ Object
Creates the given topic with the given name.
- #creds ⇒ Object
-
#delete_subscription(subscription) ⇒ Object
Deletes an existing subscription.
-
#delete_topic(topic_name) ⇒ Object
Deletes the topic with the given name.
-
#get_subscription(subscription_name, options = {}) ⇒ Object
Gets the details of a subscription.
- #get_subscription_policy(subscription_name, options = {}) ⇒ Object
-
#get_topic(topic_name, options = {}) ⇒ Object
Gets the configuration of a topic.
- #get_topic_policy(topic_name, options = {}) ⇒ Object
- #iam ⇒ Object
-
#initialize(project, credentials, host: nil, retries: nil, timeout: nil) ⇒ Service
constructor
Creates a new Service instance.
- #insecure? ⇒ Boolean
- #inspect ⇒ Object
-
#list_subscriptions(options = {}) ⇒ Object
Lists matching subscriptions by project.
-
#list_topics(options = {}) ⇒ Object
Lists matching topics.
-
#list_topics_subscriptions(topic, options = {}) ⇒ Object
Lists matching subscriptions by project and topic.
-
#modify_ack_deadline(subscription, ids, deadline) ⇒ Object
Modifies the ack deadline for a specific message.
-
#modify_push_config(subscription, endpoint, attributes) ⇒ Object
Modifies the PushConfig for a specified subscription.
- #project_path(options = {}) ⇒ Object
-
#publish(topic, messages) ⇒ Object
Adds one or more messages to the topic.
- #publisher ⇒ Object
-
#pull(subscription, options = {}) ⇒ Object
Pulls a single message from the server.
- #set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object
- #set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object
- #subscriber ⇒ Object
- #subscription_path(subscription_name, options = {}) ⇒ Object
- #test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object
- #test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object
- #topic_path(topic_name, options = {}) ⇒ Object
Constructor Details
#initialize(project, credentials, host: nil, retries: nil, timeout: nil) ⇒ Service
Creates a new Service instance.
34 35 36 37 38 39 40 41 |
# File 'lib/google/cloud/pubsub/service.rb', line 34 def initialize project, credentials, host: nil, retries: nil, timeout: nil @project = project @credentials = credentials @host = host || "pubsub.googleapis.com" @retries = retries @timeout = timeout end |
Instance Attribute Details
#credentials ⇒ Object
Returns the value of attribute credentials.
30 31 32 |
# File 'lib/google/cloud/pubsub/service.rb', line 30 def credentials @credentials end |
#host ⇒ Object
Returns the value of attribute host.
30 31 32 |
# File 'lib/google/cloud/pubsub/service.rb', line 30 def host @host end |
#mocked_iam ⇒ Object
Returns the value of attribute mocked_iam.
68 69 70 |
# File 'lib/google/cloud/pubsub/service.rb', line 68 def mocked_iam @mocked_iam end |
#mocked_publisher ⇒ Object
Returns the value of attribute mocked_publisher.
61 62 63 |
# File 'lib/google/cloud/pubsub/service.rb', line 61 def mocked_publisher @mocked_publisher end |
#mocked_subscriber ⇒ Object
Returns the value of attribute mocked_subscriber.
54 55 56 |
# File 'lib/google/cloud/pubsub/service.rb', line 54 def mocked_subscriber @mocked_subscriber end |
#project ⇒ Object
Returns the value of attribute project.
30 31 32 |
# File 'lib/google/cloud/pubsub/service.rb', line 30 def project @project end |
#retries ⇒ Object
Returns the value of attribute retries.
30 31 32 |
# File 'lib/google/cloud/pubsub/service.rb', line 30 def retries @retries end |
#timeout ⇒ Object
Returns the value of attribute timeout.
30 31 32 |
# File 'lib/google/cloud/pubsub/service.rb', line 30 def timeout @timeout end |
Instance Method Details
#acknowledge(subscription, *ack_ids) ⇒ Object
Acknowledges receipt of a message.
218 219 220 221 222 223 224 225 |
# File 'lib/google/cloud/pubsub/service.rb', line 218 def acknowledge subscription, *ack_ids ack_req = Google::Pubsub::V1::AcknowledgeRequest.new( subscription: subscription_path(subscription), ack_ids: ack_ids ) execute { subscriber.acknowledge ack_req } end |
#create_subscription(topic, subscription_name, options = {}) ⇒ Object
Creates a subscription on a given topic for a given subscriber.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/google/cloud/pubsub/service.rb', line 178 def create_subscription topic, subscription_name, = {} sub_params = { name: subscription_path(subscription_name, ), topic: topic_path(topic), ack_deadline_seconds: [:deadline] }.delete_if { |_, v| v.nil? } sub_req = Google::Pubsub::V1::Subscription.new sub_params if [:endpoint] sub_req.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: [:endpoint], attributes: ([:attributes] || {}).to_h) end execute { subscriber.create_subscription sub_req } end |
#create_topic(topic_name, options = {}) ⇒ Object
Creates the given topic with the given name.
102 103 104 105 106 107 108 |
# File 'lib/google/cloud/pubsub/service.rb', line 102 def create_topic topic_name, = {} topic_req = Google::Pubsub::V1::Topic.new.tap do |r| r.name = topic_path(topic_name, ) end execute { publisher.create_topic topic_req } end |
#creds ⇒ Object
43 44 45 46 47 |
# File 'lib/google/cloud/pubsub/service.rb', line 43 def creds return credentials if insecure? GRPC::Core::ChannelCredentials.new.compose \ GRPC::Core::CallCredentials.new credentials.client.updater_proc end |
#delete_subscription(subscription) ⇒ Object
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
196 197 198 199 200 201 202 |
# File 'lib/google/cloud/pubsub/service.rb', line 196 def delete_subscription subscription del_req = Google::Pubsub::V1::DeleteSubscriptionRequest.new( subscription: subscription_path(subscription) ) execute { subscriber.delete_subscription del_req } end |
#delete_topic(topic_name) ⇒ Object
Deletes the topic with the given name. All subscriptions to this topic are also deleted. Raises GRPC status code 5 if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.
115 116 117 118 119 120 121 |
# File 'lib/google/cloud/pubsub/service.rb', line 115 def delete_topic topic_name topic_req = Google::Pubsub::V1::DeleteTopicRequest.new.tap do |r| r.topic = topic_path(topic_name) end execute { publisher.delete_topic topic_req } end |
#get_subscription(subscription_name, options = {}) ⇒ Object
Gets the details of a subscription.
142 143 144 145 146 147 148 |
# File 'lib/google/cloud/pubsub/service.rb', line 142 def get_subscription subscription_name, = {} sub_req = Google::Pubsub::V1::GetSubscriptionRequest.new( subscription: subscription_path(subscription_name, ) ) execute { subscriber.get_subscription sub_req } end |
#get_subscription_policy(subscription_name, options = {}) ⇒ Object
282 283 284 285 286 287 288 |
# File 'lib/google/cloud/pubsub/service.rb', line 282 def get_subscription_policy subscription_name, = {} get_req = Google::Iam::V1::GetIamPolicyRequest.new( resource: subscription_path(subscription_name, ) ) execute { iam.get_iam_policy get_req } end |
#get_topic(topic_name, options = {}) ⇒ Object
Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.
80 81 82 83 84 85 86 |
# File 'lib/google/cloud/pubsub/service.rb', line 80 def get_topic topic_name, = {} topic_req = Google::Pubsub::V1::GetTopicRequest.new.tap do |r| r.topic = topic_path(topic_name, ) end execute { publisher.get_topic topic_req } end |
#get_topic_policy(topic_name, options = {}) ⇒ Object
256 257 258 259 260 261 262 |
# File 'lib/google/cloud/pubsub/service.rb', line 256 def get_topic_policy topic_name, = {} get_req = Google::Iam::V1::GetIamPolicyRequest.new( resource: topic_path(topic_name, ) ) execute { iam.get_iam_policy get_req } end |
#iam ⇒ Object
63 64 65 66 67 |
# File 'lib/google/cloud/pubsub/service.rb', line 63 def iam return mocked_iam if mocked_iam @iam ||= Google::Iam::V1::IAMPolicy::Stub.new( host, creds, timeout: timeout) end |
#insecure? ⇒ Boolean
70 71 72 |
# File 'lib/google/cloud/pubsub/service.rb', line 70 def insecure? credentials == :this_channel_is_insecure end |
#inspect ⇒ Object
324 325 326 |
# File 'lib/google/cloud/pubsub/service.rb', line 324 def inspect "#{self.class}(#{@project})" end |
#list_subscriptions(options = {}) ⇒ Object
Lists matching subscriptions by project.
165 166 167 168 169 170 171 172 173 174 |
# File 'lib/google/cloud/pubsub/service.rb', line 165 def list_subscriptions = {} list_params = { project: project_path(), page_token: [:token], page_size: [:max] }.delete_if { |_, v| v.nil? } list_req = Google::Pubsub::V1::ListSubscriptionsRequest.new( list_params) execute { subscriber.list_subscriptions list_req } end |
#list_topics(options = {}) ⇒ Object
Lists matching topics.
90 91 92 93 94 95 96 97 98 |
# File 'lib/google/cloud/pubsub/service.rb', line 90 def list_topics = {} topics_req = Google::Pubsub::V1::ListTopicsRequest.new.tap do |r| r.project = project_path() r.page_token = [:token] if [:token] r.page_size = [:max] if [:max] end execute { publisher.list_topics topics_req } end |
#list_topics_subscriptions(topic, options = {}) ⇒ Object
Lists matching subscriptions by project and topic.
152 153 154 155 156 157 158 159 160 161 |
# File 'lib/google/cloud/pubsub/service.rb', line 152 def list_topics_subscriptions topic, = {} list_params = { topic: topic_path(topic, ), page_token: [:token], page_size: [:max] }.delete_if { |_, v| v.nil? } list_req = Google::Pubsub::V1::ListTopicSubscriptionsRequest.new \ list_params execute { publisher.list_topic_subscriptions list_req } end |
#modify_ack_deadline(subscription, ids, deadline) ⇒ Object
Modifies the ack deadline for a specific message.
246 247 248 249 250 251 252 253 254 |
# File 'lib/google/cloud/pubsub/service.rb', line 246 def modify_ack_deadline subscription, ids, deadline mad_req = Google::Pubsub::V1::ModifyAckDeadlineRequest.new( subscription: subscription_path(subscription), ack_ids: Array(ids), ack_deadline_seconds: deadline ) execute { subscriber.modify_ack_deadline mad_req } end |
#modify_push_config(subscription, endpoint, attributes) ⇒ Object
Modifies the PushConfig for a specified subscription.
229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/google/cloud/pubsub/service.rb', line 229 def modify_push_config subscription, endpoint, attributes # Convert attributes to strings to match the protobuf definition attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }] mpc_req = Google::Pubsub::V1::ModifyPushConfigRequest.new( subscription: subscription_path(subscription), push_config: Google::Pubsub::V1::PushConfig.new( push_endpoint: endpoint, attributes: attributes ) ) execute { subscriber.modify_push_config mpc_req } end |
#project_path(options = {}) ⇒ Object
309 310 311 312 |
# File 'lib/google/cloud/pubsub/service.rb', line 309 def project_path = {} project_name = [:project] || project "projects/#{project_name}" end |
#publish(topic, messages) ⇒ Object
Adds one or more messages to the topic. Raises GRPC status code 5 if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.
128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/google/cloud/pubsub/service.rb', line 128 def publish topic, publish_req = Google::Pubsub::V1::PublishRequest.new( topic: topic_path(topic), messages: .map do |data, attributes| Google::Pubsub::V1::PubsubMessage.new( data: data, attributes: attributes) end ) execute { publisher.publish publish_req } end |
#publisher ⇒ Object
56 57 58 59 60 |
# File 'lib/google/cloud/pubsub/service.rb', line 56 def publisher return mocked_publisher if mocked_publisher @publisher ||= Google::Pubsub::V1::Publisher::Stub.new( host, creds, timeout: timeout) end |
#pull(subscription, options = {}) ⇒ Object
Pulls a single message from the server.
206 207 208 209 210 211 212 213 214 |
# File 'lib/google/cloud/pubsub/service.rb', line 206 def pull subscription, = {} pull_req = Google::Pubsub::V1::PullRequest.new( subscription: subscription_path(subscription, ), return_immediately: !(!.fetch(:immediate, true)), max_messages: .fetch(:max, 100).to_i ) execute { subscriber.pull pull_req } end |
#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object
290 291 292 293 294 295 296 297 |
# File 'lib/google/cloud/pubsub/service.rb', line 290 def set_subscription_policy subscription_name, new_policy, = {} set_req = Google::Iam::V1::SetIamPolicyRequest.new( resource: subscription_path(subscription_name, ), policy: new_policy ) execute { iam.set_iam_policy set_req } end |
#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object
264 265 266 267 268 269 270 271 |
# File 'lib/google/cloud/pubsub/service.rb', line 264 def set_topic_policy topic_name, new_policy, = {} set_req = Google::Iam::V1::SetIamPolicyRequest.new( resource: topic_path(topic_name, ), policy: new_policy ) execute { iam.set_iam_policy set_req } end |
#subscriber ⇒ Object
49 50 51 52 53 |
# File 'lib/google/cloud/pubsub/service.rb', line 49 def subscriber return mocked_subscriber if mocked_subscriber @subscriber ||= Google::Pubsub::V1::Subscriber::Stub.new( host, creds, timeout: timeout) end |
#subscription_path(subscription_name, options = {}) ⇒ Object
319 320 321 322 |
# File 'lib/google/cloud/pubsub/service.rb', line 319 def subscription_path subscription_name, = {} return subscription_name if subscription_name.to_s.include? "/" "#{project_path()}/subscriptions/#{subscription_name}" end |
#test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object
299 300 301 302 303 304 305 306 307 |
# File 'lib/google/cloud/pubsub/service.rb', line 299 def subscription_name, , = {} test_req = Google::Iam::V1::TestIamPermissionsRequest.new( resource: subscription_path(subscription_name, ), permissions: ) execute { iam. test_req } end |
#test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object
273 274 275 276 277 278 279 280 |
# File 'lib/google/cloud/pubsub/service.rb', line 273 def topic_name, , = {} test_req = Google::Iam::V1::TestIamPermissionsRequest.new( resource: topic_path(topic_name, ), permissions: ) execute { iam. test_req } end |
#topic_path(topic_name, options = {}) ⇒ Object
314 315 316 317 |
# File 'lib/google/cloud/pubsub/service.rb', line 314 def topic_path topic_name, = {} return topic_name if topic_name.to_s.include? "/" "#{project_path()}/topics/#{topic_name}" end |