Class: Gcloud::Pubsub::Subscription
- Inherits:
-
Object
- Object
- Gcloud::Pubsub::Subscription
- Defined in:
- lib/gcloud/pubsub/subscription.rb,
lib/gcloud/pubsub/subscription/list.rb
Overview
# Subscription
A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.
Defined Under Namespace
Classes: List
Instance Attribute Summary collapse
Class Method Summary collapse
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
-
#delete ⇒ Boolean
Deletes an existing subscription.
-
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
-
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#initialize ⇒ Subscription
constructor
A new instance of Subscription.
-
#lazy? ⇒ Boolean
Determines whether the subscription object was created with an HTTP call.
-
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages.
-
#name ⇒ Object
The name of the subscription.
-
#policy(force: nil) {|policy| ... } ⇒ Policy
Gets the [Cloud IAM](cloud.google.com/iam/) access control policy for this subscription.
-
#policy=(new_policy) ⇒ Object
Updates the [Cloud IAM](cloud.google.com/iam/) access control policy for this subscription.
-
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls messages from the server.
-
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the [Cloud IAM](cloud.google.com/iam/) access control policy.
-
#topic ⇒ Topic
The Topic from which this subscription receives messages.
-
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available.
Constructor Details
#initialize ⇒ Subscription
Returns a new instance of Subscription.
49 50 51 52 53 54 |
# File 'lib/gcloud/pubsub/subscription.rb', line 49 def initialize @service = nil @grpc = Google::Pubsub::V1::Subscription.new @name = nil @exists = nil end |
Instance Attribute Details
#grpc ⇒ Object
45 46 47 |
# File 'lib/gcloud/pubsub/subscription.rb', line 45 def grpc @grpc end |
#service ⇒ Object
41 42 43 |
# File 'lib/gcloud/pubsub/subscription.rb', line 41 def service @service end |
Class Method Details
.from_grpc(grpc, service) ⇒ Object
object.
531 532 533 534 535 536 |
# File 'lib/gcloud/pubsub/subscription.rb', line 531 def self.from_grpc grpc, service new.tap do |f| f.grpc = grpc f.service = service end end |
.new_lazy(name, service, options = {}) ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/gcloud/pubsub/subscription.rb', line 58 def self.new_lazy name, service, = {} new.tap do |s| s.grpc = nil s.service = service s.instance_variable_set "@name", service.subscription_path(name, ) end end |
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
349 350 351 352 353 354 355 |
# File 'lib/gcloud/pubsub/subscription.rb', line 349 def acknowledge * ack_ids = coerce_ack_ids return true if ack_ids.empty? ensure_service! service.acknowledge name, *ack_ids true end |
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
95 96 97 98 |
# File 'lib/gcloud/pubsub/subscription.rb', line 95 def deadline ensure_grpc! @grpc.ack_deadline_seconds end |
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
383 384 385 386 387 388 |
# File 'lib/gcloud/pubsub/subscription.rb', line 383 def delay new_deadline, * ack_ids = coerce_ack_ids ensure_service! service.modify_ack_deadline name, ack_ids, new_deadline true end |
#delete ⇒ Boolean
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
175 176 177 178 179 |
# File 'lib/gcloud/pubsub/subscription.rb', line 175 def delete ensure_service! service.delete_subscription name true end |
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
103 104 105 106 |
# File 'lib/gcloud/pubsub/subscription.rb', line 103 def endpoint ensure_grpc! @grpc.push_config.push_endpoint if @grpc.push_config end |
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
110 111 112 113 114 115 116 117 |
# File 'lib/gcloud/pubsub/subscription.rb', line 110 def endpoint= new_endpoint ensure_service! service.modify_push_config name, new_endpoint, {} @grpc.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: new_endpoint, attributes: {} ) if @grpc end |
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
131 132 133 134 135 136 137 138 139 140 |
# File 'lib/gcloud/pubsub/subscription.rb', line 131 def exists? # Always true if we have a grpc object return true unless @grpc.nil? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = !@grpc.nil? rescue Gcloud::NotFoundError @exists = false end |
#lazy? ⇒ Boolean
Determines whether the subscription object was created with an HTTP call.
156 157 158 |
# File 'lib/gcloud/pubsub/subscription.rb', line 156 def lazy? @grpc.nil? end |
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages. This runs a loop to ping the API, blocking indefinitely, yielding retrieved messages as they are received.
317 318 319 320 321 322 323 324 325 326 |
# File 'lib/gcloud/pubsub/subscription.rb', line 317 def listen max: 100, autoack: false, delay: 1 loop do msgs = max: max, autoack: autoack if msgs.any? msgs.each { |msg| yield msg } else sleep delay end end end |
#name ⇒ Object
The name of the subscription.
69 70 71 |
# File 'lib/gcloud/pubsub/subscription.rb', line 69 def name @grpc ? @grpc.name : @name end |
#policy(force: nil) {|policy| ... } ⇒ Policy
Gets the [Cloud IAM](cloud.google.com/iam/) access control policy for this subscription.
441 442 443 444 445 446 447 448 449 450 451 452 |
# File 'lib/gcloud/pubsub/subscription.rb', line 441 def policy force: nil @policy = nil if force || block_given? @policy ||= begin ensure_service! grpc = service.get_subscription_policy name Policy.from_grpc grpc end return @policy unless block_given? p = @policy.deep_dup yield p self.policy = p end |
#policy=(new_policy) ⇒ Object
Updates the [Cloud IAM](cloud.google.com/iam/) access control policy for this subscription. The policy should be read from #policy. See Policy for an explanation of the policy ‘etag` property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
482 483 484 485 486 |
# File 'lib/gcloud/pubsub/subscription.rb', line 482 def policy= new_policy ensure_service! grpc = service.set_subscription_policy name, new_policy.to_grpc @policy = Policy.from_grpc grpc end |
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls messages from the server. Returns an empty list if there are no messages available in the backlog. Raises an ApiError with status ‘UNAVAILABLE` if there are too many concurrent pull requests pending for the given subscription.
228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/gcloud/pubsub/subscription.rb', line 228 def pull immediate: true, max: 100, autoack: false ensure_service! = { immediate: immediate, max: max } list_grpc = service.pull name, = Array(list_grpc.).map do |msg_grpc| ReceivedMessage.from_grpc msg_grpc, self end acknowledge if autoack rescue Hurley::Timeout [] end |
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the [Cloud IAM](cloud.google.com/iam/) access control policy.
521 522 523 524 525 526 |
# File 'lib/gcloud/pubsub/subscription.rb', line 521 def * = Array().flatten ensure_service! grpc = service. name, grpc. end |
#topic ⇒ Topic
The Topic from which this subscription receives messages.
87 88 89 90 |
# File 'lib/gcloud/pubsub/subscription.rb', line 87 def topic ensure_grpc! Topic.new_lazy @grpc.topic, service end |
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
265 266 267 |
# File 'lib/gcloud/pubsub/subscription.rb', line 265 def max: 100, autoack: false pull immediate: false, max: max, autoack: autoack end |