Class: Google::Cloud::PubSub::Topic
- Inherits:
-
Object
- Object
- Google::Cloud::PubSub::Topic
- Defined in:
- lib/google/cloud/pubsub/topic.rb,
lib/google/cloud/pubsub/topic/list.rb
Overview
Defined Under Namespace
Classes: List
Instance Method Summary collapse
-
#async_publisher ⇒ AsyncPublisher
AsyncPublisher object used to publish multiple messages in batches.
-
#delete ⇒ Boolean
Permanently deletes the topic.
-
#enable_message_ordering! ⇒ Object
Enables message ordering for messages with ordering keys on the #async_publisher.
-
#exists? ⇒ Boolean
Determines whether the topic exists in the Pub/Sub service.
-
#kms_key ⇒ String
The Cloud KMS encryption key that will be used to protect access to messages published on this topic.
-
#kms_key=(new_kms_key_name) ⇒ Object
Set the Cloud KMS encryption key that will be used to protect access to messages published on this topic.
-
#labels ⇒ Hash
A hash of user-provided labels associated with this topic.
-
#labels=(new_labels) ⇒ Object
Sets the hash of user-provided labels associated with this topic.
-
#message_ordering? ⇒ Boolean
Whether message ordering for messages with ordering keys has been enabled on the #async_publisher.
-
#name ⇒ String
The name of the topic in the form of "/projects/project-identifier/topics/topic-name".
-
#persistence_regions ⇒ Array<String>
The list of GCP region IDs where messages that are published to the topic may be persisted in storage.
-
#persistence_regions=(new_persistence_regions) ⇒ Object
Sets the list of GCP region IDs where messages that are published to the topic may be persisted in storage.
-
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this topic.
-
#publish(data = nil, attributes = {}) {|batch| ... } ⇒ Message+
Publishes one or more messages to the topic.
-
#publish_async(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object
Publishes a message asynchronously to the topic using #async_publisher.
-
#reference? ⇒ Boolean
Determines whether the topic object was created without retrieving the resource representation from the Pub/Sub service.
-
#reload! ⇒ Google::Cloud::PubSub::Topic
(also: #refresh!)
Reloads the topic with current data from the Pub/Sub service.
-
#resource? ⇒ Boolean
Determines whether the topic object was created with a resource representation from the Pub/Sub service.
-
#resume_publish(ordering_key) ⇒ boolean
Resume publishing ordered messages for the provided ordering key.
-
#subscribe(subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil, push_config: nil, labels: nil, message_ordering: nil, filter: nil, dead_letter_topic: nil, dead_letter_max_delivery_attempts: nil, retry_policy: nil) ⇒ Google::Cloud::PubSub::Subscription
(also: #create_subscription, #new_subscription)
Creates a new Subscription object on the current Topic.
-
#subscription(subscription_name, skip_lookup: nil) ⇒ Google::Cloud::PubSub::Subscription?
(also: #get_subscription, #find_subscription)
Retrieves subscription by name.
-
#subscriptions(token: nil, max: nil) ⇒ Array<Subscription>
(also: #find_subscriptions, #list_subscriptions)
Retrieves a list of subscription names for the given project.
-
#test_permissions(*permissions) ⇒ Array<Strings>
Tests the specified permissions against the Cloud IAM access control policy.
-
#update_policy(new_policy) ⇒ Policy
(also: #policy=)
Updates the Cloud IAM access control policy for this topic.
Instance Method Details
#async_publisher ⇒ AsyncPublisher
AsyncPublisher object used to publish multiple messages in batches.
83 84 85 |
# File 'lib/google/cloud/pubsub/topic.rb', line 83 def async_publisher @async_publisher end |
#delete ⇒ Boolean
Permanently deletes the topic.
249 250 251 252 253 |
# File 'lib/google/cloud/pubsub/topic.rb', line 249 def delete ensure_service! service.delete_topic name true end |
#enable_message_ordering! ⇒ Object
At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.
Enables message ordering for messages with ordering keys on the
#async_publisher. When enabled, messages published with the same
ordering_key will be delivered in the order they were published.
See #message_ordering?. See #publish_async, Subscription#listen, and Message#ordering_key.
685 686 687 688 |
# File 'lib/google/cloud/pubsub/topic.rb', line 685 def @async_publisher ||= AsyncPublisher.new name, service, @async_opts @async_publisher. end |
#exists? ⇒ Boolean
Determines whether the topic exists in the Pub/Sub service.
850 851 852 853 854 855 856 857 858 859 |
# File 'lib/google/cloud/pubsub/topic.rb', line 850 def exists? # Always true if the object is not set as reference return true unless reference? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = true rescue Google::Cloud::NotFoundError @exists = false end |
#kms_key ⇒ String
The Cloud KMS encryption key that will be used to protect access
to messages published on this topic.
For example: projects/a/locations/b/keyRings/c/cryptoKeys/d
The default value is nil, which means default encryption is used.
Makes an API call to retrieve the KMS encryption key when called on a reference object. See #reference?.
155 156 157 158 |
# File 'lib/google/cloud/pubsub/topic.rb', line 155 def kms_key ensure_grpc! @grpc.kms_key_name end |
#kms_key=(new_kms_key_name) ⇒ Object
Set the Cloud KMS encryption key that will be used to protect access
to messages published on this topic.
For example: projects/a/locations/b/keyRings/c/cryptoKeys/d
The default value is nil, which means default encryption is used.
178 179 180 181 182 |
# File 'lib/google/cloud/pubsub/topic.rb', line 178 def kms_key= new_kms_key_name update_grpc = Google::Cloud::PubSub::V1::Topic.new name: name, kms_key_name: new_kms_key_name @grpc = service.update_topic update_grpc, :kms_key_name @resource_name = nil end |
#labels ⇒ Hash
A hash of user-provided labels associated with this topic. Labels can be used to organize and group topics. See Creating and Managing Labels.
The returned hash is frozen and changes are not allowed. Use #labels= to update the labels for this topic.
Makes an API call to retrieve the labels values when called on a reference object. See #reference?.
111 112 113 114 |
# File 'lib/google/cloud/pubsub/topic.rb', line 111 def labels ensure_grpc! @grpc.labels.to_h.freeze end |
#labels=(new_labels) ⇒ Object
Sets the hash of user-provided labels associated with this topic. Labels can be used to organize and group topics. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.
128 129 130 131 132 133 |
# File 'lib/google/cloud/pubsub/topic.rb', line 128 def labels= new_labels raise ArgumentError, "Value must be a Hash" if new_labels.nil? update_grpc = Google::Cloud::PubSub::V1::Topic.new name: name, labels: new_labels @grpc = service.update_topic update_grpc, :labels @resource_name = nil end |
#message_ordering? ⇒ Boolean
Whether message ordering for messages with ordering keys has been
enabled on the #async_publisher. When enabled, messages published
with the same ordering_key will be delivered in the order they were
published. When disabled, messages may be delivered in any order.
See #enable_message_ordering!. See #publish_async, Subscription#listen, and Message#ordering_key.
701 702 703 704 |
# File 'lib/google/cloud/pubsub/topic.rb', line 701 def @async_publisher ||= AsyncPublisher.new name, service, @async_opts @async_publisher. end |
#name ⇒ String
The name of the topic in the form of "/projects/project-identifier/topics/topic-name".
93 94 95 96 |
# File 'lib/google/cloud/pubsub/topic.rb', line 93 def name return @resource_name if reference? @grpc.name end |
#persistence_regions ⇒ Array<String>
The list of GCP region IDs where messages that are published to the topic may be persisted in storage.
Messages published by publishers running in non-allowed GCP regions (or running outside of GCP altogether) will be routed for storage in one of the allowed regions. An empty list indicates a misconfiguration at the project or organization level, which will result in all publish operations failing.
Makes an API call to retrieve the list of GCP region IDs values when called on a reference object. See #reference?.
208 209 210 211 212 |
# File 'lib/google/cloud/pubsub/topic.rb', line 208 def persistence_regions ensure_grpc! return [] if @grpc..nil? Array @grpc..allowed_persistence_regions end |
#persistence_regions=(new_persistence_regions) ⇒ Object
Sets the list of GCP region IDs where messages that are published to the topic may be persisted in storage.
229 230 231 232 233 234 |
# File 'lib/google/cloud/pubsub/topic.rb', line 229 def persistence_regions= new_persistence_regions update_grpc = Google::Cloud::PubSub::V1::Topic.new \ name: name, message_storage_policy: { allowed_persistence_regions: Array(new_persistence_regions) } @grpc = service.update_topic update_grpc, :message_storage_policy @resource_name = nil end |
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this topic.
753 754 755 756 757 758 759 760 |
# File 'lib/google/cloud/pubsub/topic.rb', line 753 def policy ensure_service! grpc = service.get_topic_policy name policy = Policy.from_grpc grpc return policy unless block_given? yield policy update_policy policy end |
#publish(data = nil, attributes = {}) {|batch| ... } ⇒ Message+
Publishes one or more messages to the topic.
The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.
557 558 559 560 561 562 563 |
# File 'lib/google/cloud/pubsub/topic.rb', line 557 def publish data = nil, attributes = {} ensure_service! batch = BatchPublisher.new data, attributes yield batch if block_given? return nil if batch..count.zero? batch end |
#publish_async(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object
At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.
Publishes a message asynchronously to the topic using #async_publisher.
The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.
Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.
To use ordering keys, specify ordering_key. Before specifying
ordering_key on a message a call to #enable_message_ordering! must
be made or an error will be raised.
667 668 669 670 671 672 |
# File 'lib/google/cloud/pubsub/topic.rb', line 667 def publish_async data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback ensure_service! @async_publisher ||= AsyncPublisher.new name, service, @async_opts @async_publisher.publish data, attributes, ordering_key: ordering_key, **extra_attrs, &callback end |
#reference? ⇒ Boolean
Determines whether the topic object was created without retrieving the resource representation from the Pub/Sub service.
876 877 878 |
# File 'lib/google/cloud/pubsub/topic.rb', line 876 def reference? @grpc.nil? end |
#reload! ⇒ Google::Cloud::PubSub::Topic Also known as: refresh!
Reloads the topic with current data from the Pub/Sub service.
912 913 914 915 916 917 |
# File 'lib/google/cloud/pubsub/topic.rb', line 912 def reload! ensure_service! @grpc = service.get_topic name @resource_name = nil self end |
#resource? ⇒ Boolean
Determines whether the topic object was created with a resource representation from the Pub/Sub service.
895 896 897 |
# File 'lib/google/cloud/pubsub/topic.rb', line 895 def resource? !@grpc.nil? end |
#resume_publish(ordering_key) ⇒ boolean
Resume publishing ordered messages for the provided ordering key.
714 715 716 717 |
# File 'lib/google/cloud/pubsub/topic.rb', line 714 def resume_publish ordering_key @async_publisher ||= AsyncPublisher.new name, service, @async_opts @async_publisher.resume_publish ordering_key end |
#subscribe(subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil, push_config: nil, labels: nil, message_ordering: nil, filter: nil, dead_letter_topic: nil, dead_letter_max_delivery_attempts: nil, retry_policy: nil) ⇒ Google::Cloud::PubSub::Subscription Also known as: create_subscription, new_subscription
Creates a new Subscription object on the current Topic.
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 |
# File 'lib/google/cloud/pubsub/topic.rb', line 374 def subscribe subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil, push_config: nil, labels: nil, message_ordering: nil, filter: nil, dead_letter_topic: nil, dead_letter_max_delivery_attempts: nil, retry_policy: nil ensure_service! if push_config && endpoint raise ArgumentError, "endpoint and push_config were both provided. Please provide only one." end push_config = Google::Cloud::PubSub::Subscription::PushConfig.new endpoint: endpoint if endpoint = { deadline: deadline, retain_acked: retain_acked, retention: retention, labels: labels, message_ordering: , filter: filter, dead_letter_max_delivery_attempts: dead_letter_max_delivery_attempts } [:dead_letter_topic_name] = dead_letter_topic.name if dead_letter_topic if [:dead_letter_max_delivery_attempts] && ![:dead_letter_topic_name] # Service error message "3:Invalid resource name given (name=)." does not identify param. raise ArgumentError, "dead_letter_topic is required with dead_letter_max_delivery_attempts" end [:push_config] = push_config.to_grpc if push_config [:retry_policy] = retry_policy.to_grpc if retry_policy grpc = service.create_subscription name, subscription_name, Subscription.from_grpc grpc, service end |
#subscription(subscription_name, skip_lookup: nil) ⇒ Google::Cloud::PubSub::Subscription? Also known as: get_subscription, find_subscription
Retrieves subscription by name.
448 449 450 451 452 453 454 455 |
# File 'lib/google/cloud/pubsub/topic.rb', line 448 def subscription subscription_name, skip_lookup: nil ensure_service! return Subscription.from_name subscription_name, service if skip_lookup grpc = service.get_subscription subscription_name Subscription.from_grpc grpc, service rescue Google::Cloud::NotFoundError nil end |
#subscriptions(token: nil, max: nil) ⇒ Array<Subscription> Also known as: find_subscriptions, list_subscriptions
Retrieves a list of subscription names for the given project.
491 492 493 494 495 496 |
# File 'lib/google/cloud/pubsub/topic.rb', line 491 def subscriptions token: nil, max: nil ensure_service! = { token: token, max: max } grpc = service.list_topics_subscriptions name, Subscription::List.from_topic_grpc grpc, service, name, max end |
#test_permissions(*permissions) ⇒ Array<Strings>
Tests the specified permissions against the Cloud IAM access control policy.
831 832 833 834 835 836 837 |
# File 'lib/google/cloud/pubsub/topic.rb', line 831 def * = Array().flatten = Array().flatten ensure_service! grpc = service. name, grpc. end |
#update_policy(new_policy) ⇒ Policy Also known as: policy=
Updates the Cloud IAM access control
policy for this topic. The policy should be read from #policy. See
Policy for an explanation of the policy
etag property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
791 792 793 794 795 |
# File 'lib/google/cloud/pubsub/topic.rb', line 791 def update_policy new_policy ensure_service! grpc = service.set_topic_policy name, new_policy.to_grpc @policy = Policy.from_grpc grpc end |