Class: Gcloud::Pubsub::Service
- Inherits:
-
Object
- Object
- Gcloud::Pubsub::Service
- Defined in:
- lib/gcloud/pubsub/service.rb
Overview
methods.
Instance Attribute Summary collapse
-
#credentials ⇒ Object
Returns the value of attribute credentials.
-
#host ⇒ Object
Returns the value of attribute host.
-
#mocked_iam ⇒ Object
Returns the value of attribute mocked_iam.
-
#mocked_publisher ⇒ Object
Returns the value of attribute mocked_publisher.
-
#mocked_subscriber ⇒ Object
Returns the value of attribute mocked_subscriber.
-
#project ⇒ Object
Returns the value of attribute project.
Instance Method Summary collapse
-
#acknowledge(subscription, *ack_ids) ⇒ Object
Acknowledges receipt of a message.
-
#create_subscription(topic, subscription_name, options = {}) ⇒ Object
Creates a subscription on a given topic for a given subscriber.
-
#create_topic(topic_name, options = {}) ⇒ Object
Creates the given topic with the given name.
- #creds ⇒ Object
-
#delete_subscription(subscription) ⇒ Object
Deletes an existing subscription.
-
#delete_topic(topic_name) ⇒ Object
Deletes the topic with the given name.
-
#get_subscription(subscription_name, options = {}) ⇒ Object
Gets the details of a subscription.
- #get_subscription_policy(subscription_name, options = {}) ⇒ Object
-
#get_topic(topic_name, options = {}) ⇒ Object
Gets the configuration of a topic.
- #get_topic_policy(topic_name, options = {}) ⇒ Object
- #iam ⇒ Object
-
#initialize(project, credentials) ⇒ Service
constructor
Creates a new Service instance.
- #insecure? ⇒ Boolean
- #inspect ⇒ Object
-
#list_subscriptions(options = {}) ⇒ Object
Lists matching subscriptions by project.
-
#list_topics(options = {}) ⇒ Object
Lists matching topics.
-
#list_topics_subscriptions(topic, options = {}) ⇒ Object
Lists matching subscriptions by project and topic.
-
#modify_ack_deadline(subscription, ids, deadline) ⇒ Object
Modifies the ack deadline for a specific message.
-
#modify_push_config(subscription, endpoint, attributes) ⇒ Object
Modifies the PushConfig for a specified subscription.
- #project_path(options = {}) ⇒ Object
-
#publish(topic, messages) ⇒ Object
Adds one or more messages to the topic.
- #publisher ⇒ Object
-
#pull(subscription, options = {}) ⇒ Object
Pulls a single message from the server.
- #set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object
- #set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object
- #subscriber ⇒ Object
- #subscription_path(subscription_name, options = {}) ⇒ Object
- #test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object
- #test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object
- #topic_path(topic_name, options = {}) ⇒ Object
Constructor Details
#initialize(project, credentials) ⇒ Service
Creates a new Service instance.
32 33 34 35 36 |
# File 'lib/gcloud/pubsub/service.rb', line 32 def initialize project, credentials @project = project @credentials = credentials @host = "pubsub.googleapis.com" end |
Instance Attribute Details
#credentials ⇒ Object
Returns the value of attribute credentials.
28 29 30 |
# File 'lib/gcloud/pubsub/service.rb', line 28 def credentials @credentials end |
#host ⇒ Object
Returns the value of attribute host.
28 29 30 |
# File 'lib/gcloud/pubsub/service.rb', line 28 def host @host end |
#mocked_iam ⇒ Object
Returns the value of attribute mocked_iam.
60 61 62 |
# File 'lib/gcloud/pubsub/service.rb', line 60 def mocked_iam @mocked_iam end |
#mocked_publisher ⇒ Object
Returns the value of attribute mocked_publisher.
54 55 56 |
# File 'lib/gcloud/pubsub/service.rb', line 54 def mocked_publisher @mocked_publisher end |
#mocked_subscriber ⇒ Object
Returns the value of attribute mocked_subscriber.
48 49 50 |
# File 'lib/gcloud/pubsub/service.rb', line 48 def mocked_subscriber @mocked_subscriber end |
#project ⇒ Object
Returns the value of attribute project.
28 29 30 |
# File 'lib/gcloud/pubsub/service.rb', line 28 def project @project end |
Instance Method Details
#acknowledge(subscription, *ack_ids) ⇒ Object
Acknowledges receipt of a message.
209 210 211 212 213 214 215 216 |
# File 'lib/gcloud/pubsub/service.rb', line 209 def acknowledge subscription, *ack_ids ack_req = Google::Pubsub::V1::AcknowledgeRequest.new( subscription: subscription_path(subscription), ack_ids: ack_ids ) backoff { subscriber.acknowledge ack_req } end |
#create_subscription(topic, subscription_name, options = {}) ⇒ Object
Creates a subscription on a given topic for a given subscriber.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/gcloud/pubsub/service.rb', line 169 def create_subscription topic, subscription_name, = {} sub_params = { name: subscription_path(subscription_name, ), topic: topic_path(topic), ack_deadline_seconds: [:deadline] }.delete_if { |_, v| v.nil? } sub_req = Google::Pubsub::V1::Subscription.new sub_params if [:endpoint] sub_req.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: [:endpoint], attributes: ([:attributes] || {}).to_h) end backoff { subscriber.create_subscription sub_req } end |
#create_topic(topic_name, options = {}) ⇒ Object
Creates the given topic with the given name.
94 95 96 97 98 99 100 |
# File 'lib/gcloud/pubsub/service.rb', line 94 def create_topic topic_name, = {} topic_req = Google::Pubsub::V1::Topic.new.tap do |r| r.name = topic_path(topic_name, ) end backoff { publisher.create_topic topic_req } end |
#creds ⇒ Object
38 39 40 41 42 |
# File 'lib/gcloud/pubsub/service.rb', line 38 def creds return credentials if insecure? GRPC::Core::ChannelCredentials.new.compose \ GRPC::Core::CallCredentials.new credentials.client.updater_proc end |
#delete_subscription(subscription) ⇒ Object
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
187 188 189 190 191 192 193 |
# File 'lib/gcloud/pubsub/service.rb', line 187 def delete_subscription subscription del_req = Google::Pubsub::V1::DeleteSubscriptionRequest.new( subscription: subscription_path(subscription) ) backoff { subscriber.delete_subscription del_req } end |
#delete_topic(topic_name) ⇒ Object
Deletes the topic with the given name. All subscriptions to this topic are also deleted. Raises GRPC status code 5 if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.
107 108 109 110 111 112 113 |
# File 'lib/gcloud/pubsub/service.rb', line 107 def delete_topic topic_name topic_req = Google::Pubsub::V1::DeleteTopicRequest.new.tap do |r| r.topic = topic_path(topic_name) end backoff { publisher.delete_topic topic_req } end |
#get_subscription(subscription_name, options = {}) ⇒ Object
Gets the details of a subscription.
136 137 138 139 140 141 142 |
# File 'lib/gcloud/pubsub/service.rb', line 136 def get_subscription subscription_name, = {} sub_req = Google::Pubsub::V1::GetSubscriptionRequest.new( subscription: subscription_path(subscription_name, ) ) backoff { subscriber.get_subscription sub_req } end |
#get_subscription_policy(subscription_name, options = {}) ⇒ Object
273 274 275 276 277 278 279 |
# File 'lib/gcloud/pubsub/service.rb', line 273 def get_subscription_policy subscription_name, = {} get_req = Google::Iam::V1::GetIamPolicyRequest.new( resource: subscription_path(subscription_name, ) ) backoff { iam.get_iam_policy get_req } end |
#get_topic(topic_name, options = {}) ⇒ Object
Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.
72 73 74 75 76 77 78 |
# File 'lib/gcloud/pubsub/service.rb', line 72 def get_topic topic_name, = {} topic_req = Google::Pubsub::V1::GetTopicRequest.new.tap do |r| r.topic = topic_path(topic_name, ) end backoff { publisher.get_topic topic_req } end |
#get_topic_policy(topic_name, options = {}) ⇒ Object
247 248 249 250 251 252 253 |
# File 'lib/gcloud/pubsub/service.rb', line 247 def get_topic_policy topic_name, = {} get_req = Google::Iam::V1::GetIamPolicyRequest.new( resource: topic_path(topic_name, ) ) backoff { iam.get_iam_policy get_req } end |
#iam ⇒ Object
56 57 58 59 |
# File 'lib/gcloud/pubsub/service.rb', line 56 def iam return mocked_iam if mocked_iam @iam ||= Google::Iam::V1::IAMPolicy::Stub.new host, creds end |
#insecure? ⇒ Boolean
62 63 64 |
# File 'lib/gcloud/pubsub/service.rb', line 62 def insecure? credentials == :this_channel_is_insecure end |
#inspect ⇒ Object
315 316 317 |
# File 'lib/gcloud/pubsub/service.rb', line 315 def inspect "#{self.class}(#{@project})" end |
#list_subscriptions(options = {}) ⇒ Object
Lists matching subscriptions by project.
158 159 160 161 162 163 164 165 |
# File 'lib/gcloud/pubsub/service.rb', line 158 def list_subscriptions = {} list_params = { project: project_path(), page_token: [:token], page_size: [:max] }.delete_if { |_, v| v.nil? } list_req = Google::Pubsub::V1::ListSubscriptionsRequest.new list_params backoff { subscriber.list_subscriptions list_req } end |
#list_topics(options = {}) ⇒ Object
Lists matching topics.
82 83 84 85 86 87 88 89 90 |
# File 'lib/gcloud/pubsub/service.rb', line 82 def list_topics = {} topics_req = Google::Pubsub::V1::ListTopicsRequest.new.tap do |r| r.project = project_path() r.page_token = [:token] if [:token] r.page_size = [:max] if [:max] end backoff { publisher.list_topics topics_req } end |
#list_topics_subscriptions(topic, options = {}) ⇒ Object
Lists matching subscriptions by project and topic.
146 147 148 149 150 151 152 153 154 |
# File 'lib/gcloud/pubsub/service.rb', line 146 def list_topics_subscriptions topic, = {} list_params = { topic: topic_path(topic, ), page_token: [:token], page_size: [:max] }.delete_if { |_, v| v.nil? } list_req = Google::Pubsub::V1::ListTopicSubscriptionsRequest.new \ list_params backoff { publisher.list_topic_subscriptions list_req } end |
#modify_ack_deadline(subscription, ids, deadline) ⇒ Object
Modifies the ack deadline for a specific message.
237 238 239 240 241 242 243 244 245 |
# File 'lib/gcloud/pubsub/service.rb', line 237 def modify_ack_deadline subscription, ids, deadline mad_req = Google::Pubsub::V1::ModifyAckDeadlineRequest.new( subscription: subscription_path(subscription), ack_ids: Array(ids), ack_deadline_seconds: deadline ) backoff { subscriber.modify_ack_deadline mad_req } end |
#modify_push_config(subscription, endpoint, attributes) ⇒ Object
Modifies the PushConfig for a specified subscription.
220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/gcloud/pubsub/service.rb', line 220 def modify_push_config subscription, endpoint, attributes # Convert attributes to strings to match the protobuf definition attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }] mpc_req = Google::Pubsub::V1::ModifyPushConfigRequest.new( subscription: subscription_path(subscription), push_config: Google::Pubsub::V1::PushConfig.new( push_endpoint: endpoint, attributes: attributes ) ) backoff { subscriber.modify_push_config mpc_req } end |
#project_path(options = {}) ⇒ Object
300 301 302 303 |
# File 'lib/gcloud/pubsub/service.rb', line 300 def project_path = {} project_name = [:project] || project "projects/#{project_name}" end |
#publish(topic, messages) ⇒ Object
Adds one or more messages to the topic. Raises GRPC status code 5 if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.
120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/gcloud/pubsub/service.rb', line 120 def publish topic, publish_req = Google::Pubsub::V1::PublishRequest.new( topic: topic_path(topic), messages: .map do |data, attributes| Google::Pubsub::V1::PubsubMessage.new( data: String(data).encode("ASCII-8BIT"), attributes: attributes ) end ) backoff { publisher.publish publish_req } end |
#publisher ⇒ Object
50 51 52 53 |
# File 'lib/gcloud/pubsub/service.rb', line 50 def publisher return mocked_publisher if mocked_publisher @publisher ||= Google::Pubsub::V1::Publisher::Stub.new host, creds end |
#pull(subscription, options = {}) ⇒ Object
Pulls a single message from the server.
197 198 199 200 201 202 203 204 205 |
# File 'lib/gcloud/pubsub/service.rb', line 197 def pull subscription, = {} pull_req = Google::Pubsub::V1::PullRequest.new( subscription: subscription_path(subscription, ), return_immediately: !(!.fetch(:immediate, true)), max_messages: .fetch(:max, 100).to_i ) backoff { subscriber.pull pull_req } end |
#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object
281 282 283 284 285 286 287 288 |
# File 'lib/gcloud/pubsub/service.rb', line 281 def set_subscription_policy subscription_name, new_policy, = {} set_req = Google::Iam::V1::SetIamPolicyRequest.new( resource: subscription_path(subscription_name, ), policy: Google::Iam::V1::Policy.decode_json(JSON.dump(new_policy)) ) backoff { iam.set_iam_policy set_req } end |
#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object
255 256 257 258 259 260 261 262 |
# File 'lib/gcloud/pubsub/service.rb', line 255 def set_topic_policy topic_name, new_policy, = {} set_req = Google::Iam::V1::SetIamPolicyRequest.new( resource: topic_path(topic_name, ), policy: Google::Iam::V1::Policy.decode_json(JSON.dump(new_policy)) ) backoff { iam.set_iam_policy set_req } end |
#subscriber ⇒ Object
44 45 46 47 |
# File 'lib/gcloud/pubsub/service.rb', line 44 def subscriber return mocked_subscriber if mocked_subscriber @subscriber ||= Google::Pubsub::V1::Subscriber::Stub.new host, creds end |
#subscription_path(subscription_name, options = {}) ⇒ Object
310 311 312 313 |
# File 'lib/gcloud/pubsub/service.rb', line 310 def subscription_path subscription_name, = {} return subscription_name if subscription_name.to_s.include? "/" "#{project_path()}/subscriptions/#{subscription_name}" end |
#test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object
290 291 292 293 294 295 296 297 298 |
# File 'lib/gcloud/pubsub/service.rb', line 290 def subscription_name, , = {} test_req = Google::Iam::V1::TestIamPermissionsRequest.new( resource: subscription_path(subscription_name, ), permissions: ) backoff { iam. test_req } end |
#test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object
264 265 266 267 268 269 270 271 |
# File 'lib/gcloud/pubsub/service.rb', line 264 def topic_name, , = {} test_req = Google::Iam::V1::TestIamPermissionsRequest.new( resource: topic_path(topic_name, ), permissions: ) backoff { iam. test_req } end |
#topic_path(topic_name, options = {}) ⇒ Object
305 306 307 308 |
# File 'lib/gcloud/pubsub/service.rb', line 305 def topic_path topic_name, = {} return topic_name if topic_name.to_s.include? "/" "#{project_path()}/topics/#{topic_name}" end |