Class: Gcloud::Pubsub::Subscription
- Inherits:
-
Object
- Object
- Gcloud::Pubsub::Subscription
- Defined in:
- lib/gcloud/pubsub/subscription.rb,
lib/gcloud/pubsub/subscription/list.rb
Overview
# Subscription
A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.
Defined Under Namespace
Classes: List
Instance Attribute Summary collapse
Class Method Summary collapse
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
-
#delete ⇒ Boolean
Deletes an existing subscription.
-
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
-
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#initialize ⇒ Subscription
constructor
A new instance of Subscription.
-
#lazy? ⇒ Boolean
Determines whether the subscription object was created with an HTTP call.
-
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages.
-
#name ⇒ Object
The name of the subscription.
-
#policy(force: nil) ⇒ Hash
Gets the access control policy.
-
#policy=(new_policy) ⇒ Object
Sets the access control policy.
-
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls messages from the server.
-
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the [Cloud IAM](cloud.google.com/iam/) access control policy.
-
#topic ⇒ Topic
The Topic from which this subscription receives messages.
-
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available.
Constructor Details
#initialize ⇒ Subscription
Returns a new instance of Subscription.
49 50 51 52 53 54 |
# File 'lib/gcloud/pubsub/subscription.rb', line 49 def initialize @service = nil @grpc = Google::Pubsub::V1::Subscription.new @name = nil @exists = nil end |
Instance Attribute Details
#grpc ⇒ Object
45 46 47 |
# File 'lib/gcloud/pubsub/subscription.rb', line 45 def grpc @grpc end |
#service ⇒ Object
41 42 43 |
# File 'lib/gcloud/pubsub/subscription.rb', line 41 def service @service end |
Class Method Details
.from_grpc(grpc, service) ⇒ Object
object.
534 535 536 537 538 539 |
# File 'lib/gcloud/pubsub/subscription.rb', line 534 def self.from_grpc grpc, service new.tap do |f| f.grpc = grpc f.service = service end end |
.new_lazy(name, service, options = {}) ⇒ Object
58 59 60 61 62 63 64 65 66 67 |
# File 'lib/gcloud/pubsub/subscription.rb', line 58 def self.new_lazy name, service, = {} sub = new.tap do |f| f.grpc = nil f.service = service end sub.instance_eval do @name = service.subscription_path(name, ) end sub end |
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
357 358 359 360 361 362 363 364 365 |
# File 'lib/gcloud/pubsub/subscription.rb', line 357 def acknowledge * ack_ids = coerce_ack_ids return true if ack_ids.empty? ensure_service! service.acknowledge name, *ack_ids true rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
97 98 99 100 |
# File 'lib/gcloud/pubsub/subscription.rb', line 97 def deadline ensure_grpc! @grpc.ack_deadline_seconds end |
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
393 394 395 396 397 398 399 400 |
# File 'lib/gcloud/pubsub/subscription.rb', line 393 def delay new_deadline, * ack_ids = coerce_ack_ids ensure_service! service.modify_ack_deadline name, ack_ids, new_deadline return true rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#delete ⇒ Boolean
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
179 180 181 182 183 184 185 |
# File 'lib/gcloud/pubsub/subscription.rb', line 179 def delete ensure_service! service.delete_subscription name return true rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
105 106 107 108 |
# File 'lib/gcloud/pubsub/subscription.rb', line 105 def endpoint ensure_grpc! @grpc.push_config.push_endpoint if @grpc.push_config end |
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/gcloud/pubsub/subscription.rb', line 112 def endpoint= new_endpoint ensure_service! service.modify_push_config name, new_endpoint, {} @grpc.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: new_endpoint, attributes: {} ) if @grpc rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
135 136 137 138 139 140 141 142 143 144 |
# File 'lib/gcloud/pubsub/subscription.rb', line 135 def exists? # Always true if we have a grpc object return true unless @grpc.nil? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = !@grpc.nil? rescue Gcloud::NotFoundError @exists = false end |
#lazy? ⇒ Boolean
Determines whether the subscription object was created with an HTTP call.
160 161 162 |
# File 'lib/gcloud/pubsub/subscription.rb', line 160 def lazy? @grpc.nil? end |
#listen(max: 100, autoack: false, delay: 1) {|msg| ... } ⇒ Object
Poll the backend for new messages. This runs a loop to ping the API, blocking indefinitely, yielding retrieved messages as they are received.
325 326 327 328 329 330 331 332 333 334 |
# File 'lib/gcloud/pubsub/subscription.rb', line 325 def listen max: 100, autoack: false, delay: 1 loop do msgs = max: max, autoack: autoack if msgs.any? msgs.each { |msg| yield msg } else sleep delay end end end |
#name ⇒ Object
The name of the subscription.
71 72 73 |
# File 'lib/gcloud/pubsub/subscription.rb', line 71 def name @grpc ? @grpc.name : @name end |
#policy(force: nil) ⇒ Hash
Gets the access control policy.
By default, the policy values are memoized to reduce the number of API calls to the Pub/Sub service.
442 443 444 445 446 447 448 449 450 451 |
# File 'lib/gcloud/pubsub/subscription.rb', line 442 def policy force: nil @policy = nil if force @policy ||= begin ensure_service! grpc = service.get_subscription_policy name JSON.parse(Google::Iam::V1::Policy.encode_json(grpc)) rescue GRPC::BadStatus => e raise Error.from_error(e) end end |
#policy=(new_policy) ⇒ Object
Sets the access control policy.
481 482 483 484 485 486 487 |
# File 'lib/gcloud/pubsub/subscription.rb', line 481 def policy= new_policy ensure_service! grpc = service.set_subscription_policy name, new_policy @policy = JSON.parse(Google::Iam::V1::Policy.encode_json(grpc)) rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#pull(immediate: true, max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls messages from the server. Returns an empty list if there are no messages available in the backlog. Raises an ApiError with status ‘UNAVAILABLE` if there are too many concurrent pull requests pending for the given subscription.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/gcloud/pubsub/subscription.rb', line 234 def pull immediate: true, max: 100, autoack: false ensure_service! = { immediate: immediate, max: max } list_grpc = service.pull name, = Array(list_grpc.).map do |msg_grpc| ReceivedMessage.from_grpc msg_grpc, self end acknowledge if autoack rescue GRPC::BadStatus => e raise Error.from_error(e) rescue Faraday::TimeoutError [] end |
#test_permissions(*permissions) ⇒ Array<String>
Tests the specified permissions against the [Cloud IAM](cloud.google.com/iam/) access control policy.
522 523 524 525 526 527 528 529 |
# File 'lib/gcloud/pubsub/subscription.rb', line 522 def * = Array().flatten ensure_service! grpc = service. name, grpc. rescue GRPC::BadStatus => e raise Error.from_error(e) end |
#topic ⇒ Topic
The Topic from which this subscription receives messages.
89 90 91 92 |
# File 'lib/gcloud/pubsub/subscription.rb', line 89 def topic ensure_grpc! Topic.new_lazy @grpc.topic, service end |
#wait_for_messages(max: 100, autoack: false) ⇒ Array<Gcloud::Pubsub::ReceivedMessage>
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
273 274 275 |
# File 'lib/gcloud/pubsub/subscription.rb', line 273 def max: 100, autoack: false pull immediate: false, max: max, autoack: autoack end |