Class: Gcloud::Pubsub::Subscription
- Inherits:
-
Object
- Object
- Gcloud::Pubsub::Subscription
- Defined in:
- lib/gcloud/pubsub/subscription.rb,
lib/gcloud/pubsub/subscription/list.rb
Overview
Defined Under Namespace
Classes: List
Instance Attribute Summary collapse
-
#connection ⇒ Object
The Connection object.
-
#gapi ⇒ Object
The Google API Client object.
Class Method Summary collapse
-
.from_gapi(gapi, conn) ⇒ Object
New Subscription from a Google API Client object.
-
.new_lazy(name, conn, options = {}) ⇒ Object
New lazy Topic object without making an HTTP request.
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
-
#delete ⇒ Object
Deletes an existing subscription.
-
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
-
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#initialize ⇒ Subscription
constructor
Create an empty Subscription object.
-
#lazy? ⇒ Boolean
Determines whether the subscription object was created with an HTTP call.
-
#listen(max: 100, autoack: false, delay: 1) ⇒ Object
Poll the backend for new messages.
-
#name ⇒ Object
The name of the subscription.
-
#policy(force: nil) ⇒ Object
Gets the access control policy.
-
#policy=(new_policy) ⇒ Object
Sets the access control policy.
-
#pull(immediate: true, max: 100, autoack: false) ⇒ Object
Pulls messages from the server.
-
#test_permissions(*permissions) ⇒ Object
Tests the specified permissions against the Cloud IAM access control policy.
-
#topic ⇒ Object
The Topic from which this subscription receives messages.
-
#wait_for_messages(max: 100, autoack: false) ⇒ Object
Pulls from the server while waiting for messages to become available.
Constructor Details
#initialize ⇒ Subscription
Create an empty Subscription object.
48 49 50 51 52 53 |
# File 'lib/gcloud/pubsub/subscription.rb', line 48 def initialize #:nodoc: @connection = nil @gapi = {} @name = nil @exists = nil end |
Instance Attribute Details
#connection ⇒ Object
The Connection object.
40 41 42 |
# File 'lib/gcloud/pubsub/subscription.rb', line 40 def connection @connection end |
#gapi ⇒ Object
The Google API Client object.
44 45 46 |
# File 'lib/gcloud/pubsub/subscription.rb', line 44 def gapi @gapi end |
Class Method Details
.from_gapi(gapi, conn) ⇒ Object
New Subscription from a Google API Client object.
591 592 593 594 595 596 |
# File 'lib/gcloud/pubsub/subscription.rb', line 591 def self.from_gapi gapi, conn #:nodoc: new.tap do |f| f.gapi = gapi f.connection = conn end end |
.new_lazy(name, conn, options = {}) ⇒ Object
New lazy Topic object without making an HTTP request.
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/gcloud/pubsub/subscription.rb', line 57 def self.new_lazy name, conn, = {} #:nodoc: sub = new.tap do |f| f.gapi = nil f.connection = conn end sub.instance_eval do @name = conn.subscription_path(name, ) end sub end |
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
Parameters
messages-
One or more ReceivedMessage objects or ack_id values. (
ReceivedMessageorack_id)
Example
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
= sub.pull
sub.acknowledge
395 396 397 398 399 400 401 402 403 404 |
# File 'lib/gcloud/pubsub/subscription.rb', line 395 def acknowledge * ack_ids = coerce_ack_ids ensure_connection! resp = connection.acknowledge name, *ack_ids if resp.success? true else fail ApiError.from_response(resp) end end |
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
99 100 101 102 |
# File 'lib/gcloud/pubsub/subscription.rb', line 99 def deadline ensure_gapi! @gapi["ackDeadlineSeconds"] end |
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
Parameters
new_deadline-
The new ack deadline in seconds from the time this request is sent to the Pub/Sub system. Must be >= 0. For example, if the value is
10, the new ack deadline will expire 10 seconds after the call is made. Specifying0may immediately make the messages available for another pull request. (Integer) messages-
One or more ReceivedMessage objects or ack_id values. (
ReceivedMessageorack_id)
Example
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
= sub.pull
sub.delay 120,
437 438 439 440 441 442 443 444 445 446 |
# File 'lib/gcloud/pubsub/subscription.rb', line 437 def delay new_deadline, * ack_ids = coerce_ack_ids ensure_connection! resp = connection.modify_ack_deadline name, ack_ids, new_deadline if resp.success? true else fail ApiError.from_response(resp) end end |
#delete ⇒ Object
184 185 186 187 188 189 190 191 192 |
# File 'lib/gcloud/pubsub/subscription.rb', line 184 def delete ensure_connection! resp = connection.delete_subscription name if resp.success? true else fail ApiError.from_response(resp) end end |
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
107 108 109 110 |
# File 'lib/gcloud/pubsub/subscription.rb', line 107 def endpoint ensure_gapi! @gapi["pushConfig"]["pushEndpoint"] if @gapi["pushConfig"] end |
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
114 115 116 117 118 119 120 121 122 |
# File 'lib/gcloud/pubsub/subscription.rb', line 114 def endpoint= new_endpoint ensure_connection! resp = connection.modify_push_config name, new_endpoint, {} if resp.success? @gapi["pushConfig"]["pushEndpoint"] = new_endpoint if @gapi else fail ApiError.from_response(resp) end end |
#exists? ⇒ Boolean
137 138 139 140 141 142 143 144 145 146 |
# File 'lib/gcloud/pubsub/subscription.rb', line 137 def exists? # Always true if we have a gapi object return true unless @gapi.nil? # If we have a value, return it return @exists unless @exists.nil? ensure_gapi! @exists = !@gapi.nil? rescue NotFoundError @exists = false end |
#lazy? ⇒ Boolean
162 163 164 |
# File 'lib/gcloud/pubsub/subscription.rb', line 162 def lazy? #:nodoc: @gapi.nil? end |
#listen(max: 100, autoack: false, delay: 1) ⇒ Object
Poll the backend for new messages. This runs a loop to ping the API, blocking indefinitely, yielding retrieved messages as they are received.
Parameters
max-
The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is
100, the maximum value is1000. (Integer) autoack-
Automatically acknowledge the message as it is pulled. The default value is
false. (Boolean) delay-
The number of seconds to pause between requests when the Google Cloud service has no messages to return. The default value is
1. (Number)
Examples
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
sub.listen do |msg|
# process msg
end
The number of messages pulled per batch can be set with the max option:
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
sub.listen max: 20 do |msg|
# process msg
end
Messages can be automatically acknowledged as they are pulled with the autoack option:
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
sub.listen autoack: true do |msg|
# process msg
end
359 360 361 362 363 364 365 366 367 368 |
# File 'lib/gcloud/pubsub/subscription.rb', line 359 def listen max: 100, autoack: false, delay: 1 loop do msgs = max: max, autoack: autoack if msgs.any? msgs.each { |msg| yield msg } else sleep delay end end end |
#name ⇒ Object
The name of the subscription.
70 71 72 |
# File 'lib/gcloud/pubsub/subscription.rb', line 70 def name @gapi ? @gapi["name"] : @name end |
#policy(force: nil) ⇒ Object
Gets the access control policy.
Parameters
force-
Force the latest policy to be retrieved from the Pub/Sub service when +true. Otherwise the policy will be memoized to reduce the number of API calls made to the Pub/Sub service. The default is
false. (Boolean)
Returns
A hash that conforms to the following structure:
{
"etag"=>"CAE=",
"bindings" => [{
"role" => "roles/viewer",
"members" => ["serviceAccount:your-service-account"]
}]
}
Examples
By default, the policy values are memoized to reduce the number of API calls to the Pub/Sub service.
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
subscription = pubsub.subscription "my-subscription"
puts subscription.policy["bindings"]
puts subscription.policy["rules"]
To retrieve the latest policy from the Pub/Sub service, use the force flag.
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
subscription = pubsub.subscription "my-subscription"
policy = subscription.policy force: true
496 497 498 499 500 501 502 503 504 505 |
# File 'lib/gcloud/pubsub/subscription.rb', line 496 def policy force: nil @policy = nil if force @policy ||= begin ensure_connection! resp = connection.get_subscription_policy name policy = resp.data policy = policy.to_hash if policy.respond_to? :to_hash policy end end |
#policy=(new_policy) ⇒ Object
Sets the access control policy.
Parameters
new_policy-
A hash that conforms to the following structure:
{ "bindings" => [{ "role" => "roles/viewer", "members" => ["serviceAccount:your-service-account"] }] }
Example
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
viewer_policy = {
"bindings" => [{
"role" => "roles/viewer",
"members" => ["serviceAccount:your-service-account"]
}]
}
subscription = pubsub.subscription "my-subscription"
subscription.policy = viewer_policy
538 539 540 541 542 543 544 545 546 547 |
# File 'lib/gcloud/pubsub/subscription.rb', line 538 def policy= new_policy ensure_connection! resp = connection.set_subscription_policy name, new_policy if resp.success? @policy = resp.data["policy"] @policy = @policy.to_hash if @policy.respond_to? :to_hash else fail ApiError.from_response(resp) end end |
#pull(immediate: true, max: 100, autoack: false) ⇒ Object
Pulls messages from the server. Returns an empty list if there are no messages available in the backlog. Raises an ApiError with status UNAVAILABLE if there are too many concurrent pull requests pending for the given subscription.
Parameters
immediate-
When
truethe system will respond immediately even if it is not able to return messages. Whenfalsethe system is allowed to wait until it can return least one message. No messages are returned when a request times out. The default value istrue. (Boolean) max-
The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is
100, the maximum value is1000. (Integer) autoack-
Automatically acknowledge the message as it is pulled. The default value is
false. (Boolean)
Returns
Array of Gcloud::Pubsub::ReceivedMessage
Examples
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
sub.pull.each { |msg| msg.acknowledge! }
A maximum number of messages returned can also be specified:
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub", max: 10
sub.pull.each { |msg| msg.acknowledge! }
The call can block until messages are available by setting the :immediate option to false:
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
msgs = sub.pull immediate: false
msgs.each { |msg| msg.acknowledge! }
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/gcloud/pubsub/subscription.rb', line 251 def pull immediate: true, max: 100, autoack: false ensure_connection! = { immediate: immediate, max: max } resp = connection.pull name, if resp.success? = Array(resp.data["receivedMessages"]).map do |gapi| ReceivedMessage.from_gapi gapi, self end acknowledge if autoack else fail ApiError.from_response(resp) end rescue Faraday::TimeoutError [] end |
#test_permissions(*permissions) ⇒ Object
Tests the specified permissions against the Cloud IAM access control policy. See Managing Policies for more information.
Parameters
permissions-
The set of permissions to check access for. Permissions with wildcards (such as * or
storage.*) are not allowed. (String or Array of Strings)
Returns
The permissions that have access. (Array of Strings)
Example
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-subscription"
perms = sub. "projects.subscriptions.list",
"projects.subscriptions.pull"
perms.include? "projects.subscriptions.list" #=> true
perms.include? "projects.subscriptions.pull" #=> false
578 579 580 581 582 583 584 585 586 587 |
# File 'lib/gcloud/pubsub/subscription.rb', line 578 def * = Array().flatten ensure_connection! resp = connection. name, if resp.success? Array(resp.data["permissions"]) else fail ApiError.from_response(resp) end end |
#topic ⇒ Object
91 92 93 94 |
# File 'lib/gcloud/pubsub/subscription.rb', line 91 def topic ensure_gapi! Topic.new_lazy @gapi["topic"], connection end |
#wait_for_messages(max: 100, autoack: false) ⇒ Object
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
Parameters
max-
The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is
100, the maximum value is1000. (Integer) autoack-
Automatically acknowledge the message as it is pulled. The default value is
false. (Boolean)
Returns
Array of Gcloud::Pubsub::ReceivedMessage
Example
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
msgs = sub.
msgs.each { |msg| msg.acknowledge! }
299 300 301 |
# File 'lib/gcloud/pubsub/subscription.rb', line 299 def max: 100, autoack: false pull immediate: false, max: max, autoack: autoack end |