Class: Gcloud::Pubsub::Subscription
- Inherits:
-
Object
- Object
- Gcloud::Pubsub::Subscription
- Defined in:
- lib/gcloud/pubsub/subscription.rb,
lib/gcloud/pubsub/subscription/list.rb
Overview
# Subscription
A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.
Defined Under Namespace
Classes: List
Instance Attribute Summary collapse
Class Method Summary collapse
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
-
#delete ⇒ Boolean
Deletes an existing subscription.
-
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
-
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#initialize ⇒ Subscription
constructor
A new instance of Subscription.
-
#lazy? ⇒ Boolean
Determines whether the subscription object was created with an HTTP call.
-
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages.
-
#name ⇒ Object
The name of the subscription.
-
#policy(force: nil) ⇒ Hash
Gets the access control policy.
-
#policy=(new_policy) ⇒ Object
Sets the access control policy.
-
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls messages from the server.
-
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the [Cloud IAM](cloud.google.com/iam/) access control policy.
-
#topic ⇒ Topic
The Topic from which this subscription receives messages.
-
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available.
Constructor Details
#initialize ⇒ Subscription
Returns a new instance of Subscription.
49 50 51 52 53 54 |
# File 'lib/gcloud/pubsub/subscription.rb', line 49 def initialize @service = nil @grpc = Google::Pubsub::V1::Subscription.new @name = nil @exists = nil end |
Instance Attribute Details
#grpc ⇒ Object
45 46 47 |
# File 'lib/gcloud/pubsub/subscription.rb', line 45 def grpc @grpc end |
#service ⇒ Object
41 42 43 |
# File 'lib/gcloud/pubsub/subscription.rb', line 41 def service @service end |
Class Method Details
.from_grpc(grpc, service) ⇒ Object
object.
532 533 534 535 536 537 |
# File 'lib/gcloud/pubsub/subscription.rb', line 532 def self.from_grpc grpc, service new.tap do |f| f.grpc = grpc f.service = service end end |
.new_lazy(name, service, options = {}) ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/gcloud/pubsub/subscription.rb', line 58 def self.new_lazy name, service, = {} new.tap do |s| s.grpc = nil s.service = service s.instance_variable_set "@name", service.subscription_path(name, ) end end |
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
355 356 357 358 359 360 361 362 363 |
# File 'lib/gcloud/pubsub/subscription.rb', line 355 def acknowledge * ack_ids = coerce_ack_ids return true if ack_ids.empty? ensure_service! service.acknowledge name, *ack_ids true rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
95 96 97 98 |
# File 'lib/gcloud/pubsub/subscription.rb', line 95 def deadline ensure_grpc! @grpc.ack_deadline_seconds end |
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
391 392 393 394 395 396 397 398 |
# File 'lib/gcloud/pubsub/subscription.rb', line 391 def delay new_deadline, * ack_ids = coerce_ack_ids ensure_service! service.modify_ack_deadline name, ack_ids, new_deadline return true rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#delete ⇒ Boolean
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
177 178 179 180 181 182 183 |
# File 'lib/gcloud/pubsub/subscription.rb', line 177 def delete ensure_service! service.delete_subscription name return true rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
103 104 105 106 |
# File 'lib/gcloud/pubsub/subscription.rb', line 103 def endpoint ensure_grpc! @grpc.push_config.push_endpoint if @grpc.push_config end |
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
110 111 112 113 114 115 116 117 118 119 |
# File 'lib/gcloud/pubsub/subscription.rb', line 110 def endpoint= new_endpoint ensure_service! service.modify_push_config name, new_endpoint, {} @grpc.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: new_endpoint, attributes: {} ) if @grpc rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
133 134 135 136 137 138 139 140 141 142 |
# File 'lib/gcloud/pubsub/subscription.rb', line 133 def exists? # Always true if we have a grpc object return true unless @grpc.nil? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = !@grpc.nil? rescue Gcloud::NotFoundError @exists = false end |
#lazy? ⇒ Boolean
Determines whether the subscription object was created with an HTTP call.
158 159 160 |
# File 'lib/gcloud/pubsub/subscription.rb', line 158 def lazy? @grpc.nil? end |
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages. This runs a loop to ping the API, blocking indefinitely, yielding retrieved messages as they are received.
323 324 325 326 327 328 329 330 331 332 |
# File 'lib/gcloud/pubsub/subscription.rb', line 323 def listen max: 100, autoack: false, delay: 1 loop do msgs = max: max, autoack: autoack if msgs.any? msgs.each { |msg| yield msg } else sleep delay end end end |
#name ⇒ Object
The name of the subscription.
69 70 71 |
# File 'lib/gcloud/pubsub/subscription.rb', line 69 def name @grpc ? @grpc.name : @name end |
#policy(force: nil) ⇒ Hash
Gets the access control policy.
By default, the policy values are memoized to reduce the number of API calls to the Pub/Sub service.
440 441 442 443 444 445 446 447 448 449 |
# File 'lib/gcloud/pubsub/subscription.rb', line 440 def policy force: nil @policy = nil if force @policy ||= begin ensure_service! grpc = service.get_subscription_policy name JSON.parse(Google::Iam::V1::Policy.encode_json(grpc)) rescue GRPC::BadStatus => e raise Error.from_error(e) end end |
#policy=(new_policy) ⇒ Object
Sets the access control policy.
479 480 481 482 483 484 485 |
# File 'lib/gcloud/pubsub/subscription.rb', line 479 def policy= new_policy ensure_service! grpc = service.set_subscription_policy name, new_policy @policy = JSON.parse(Google::Iam::V1::Policy.encode_json(grpc)) rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls messages from the server. Returns an empty list if there are no messages available in the backlog. Raises an ApiError with status UNAVAILABLE if there are too many concurrent pull requests pending for the given subscription.
232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/gcloud/pubsub/subscription.rb', line 232 def pull immediate: true, max: 100, autoack: false ensure_service! = { immediate: immediate, max: max } list_grpc = service.pull name, = Array(list_grpc.).map do |msg_grpc| ReceivedMessage.from_grpc msg_grpc, self end acknowledge if autoack rescue GRPC::BadStatus => e raise Error.from_error(e) rescue Faraday::TimeoutError [] end |
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the [Cloud IAM](cloud.google.com/iam/) access control policy.
520 521 522 523 524 525 526 527 |
# File 'lib/gcloud/pubsub/subscription.rb', line 520 def * = Array().flatten ensure_service! grpc = service. name, grpc. rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#topic ⇒ Topic
The Topic from which this subscription receives messages.
87 88 89 90 |
# File 'lib/gcloud/pubsub/subscription.rb', line 87 def topic ensure_grpc! Topic.new_lazy @grpc.topic, service end |
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
271 272 273 |
# File 'lib/gcloud/pubsub/subscription.rb', line 271 def max: 100, autoack: false pull immediate: false, max: max, autoack: autoack end |