Class: Gcloud::Pubsub::Service
- Inherits:
-
Object
- Object
- Gcloud::Pubsub::Service
- Defined in:
- lib/gcloud/pubsub/service.rb
Overview
methods.
Instance Attribute Summary collapse
-
#credentials ⇒ Object
Returns the value of attribute credentials.
-
#host ⇒ Object
Returns the value of attribute host.
-
#mocked_iam ⇒ Object
Returns the value of attribute mocked_iam.
-
#mocked_publisher ⇒ Object
Returns the value of attribute mocked_publisher.
-
#mocked_subscriber ⇒ Object
Returns the value of attribute mocked_subscriber.
-
#project ⇒ Object
Returns the value of attribute project.
Instance Method Summary collapse
-
#acknowledge(subscription, *ack_ids) ⇒ Object
Acknowledges receipt of a message.
-
#create_subscription(topic, subscription_name, options = {}) ⇒ Object
Creates a subscription on a given topic for a given subscriber.
-
#create_topic(topic_name, options = {}) ⇒ Object
Creates the given topic with the given name.
- #creds ⇒ Object
-
#delete_subscription(subscription) ⇒ Object
Deletes an existing subscription.
-
#delete_topic(topic_name) ⇒ Object
Deletes the topic with the given name.
-
#get_subscription(subscription_name, options = {}) ⇒ Object
Gets the details of a subscription.
- #get_subscription_policy(subscription_name, options = {}) ⇒ Object
-
#get_topic(topic_name, options = {}) ⇒ Object
Gets the configuration of a topic.
- #get_topic_policy(topic_name, options = {}) ⇒ Object
- #iam ⇒ Object
-
#initialize(project, credentials) ⇒ Service
constructor
Creates a new Service instance.
- #inspect ⇒ Object
-
#list_subscriptions(options = {}) ⇒ Object
Lists matching subscriptions by project.
-
#list_topics(options = {}) ⇒ Object
Lists matching topics.
-
#list_topics_subscriptions(topic, options = {}) ⇒ Object
Lists matching subscriptions by project and topic.
-
#modify_ack_deadline(subscription, ids, deadline) ⇒ Object
Modifies the ack deadline for a specific message.
-
#modify_push_config(subscription, endpoint, attributes) ⇒ Object
Modifies the PushConfig for a specified subscription.
- #project_path(options = {}) ⇒ Object
-
#publish(topic, messages) ⇒ Object
Adds one or more messages to the topic.
- #publisher ⇒ Object
-
#pull(subscription, options = {}) ⇒ Object
Pulls a single message from the server.
- #set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object
- #set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object
- #subscriber ⇒ Object
- #subscription_path(subscription_name, options = {}) ⇒ Object
- #test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object
- #test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object
- #topic_path(topic_name, options = {}) ⇒ Object
Constructor Details
#initialize(project, credentials) ⇒ Service
Creates a new Service instance.
32 33 34 35 36 |
# File 'lib/gcloud/pubsub/service.rb', line 32 def initialize project, credentials @project = project @credentials = credentials @host = "pubsub.googleapis.com" end |
Instance Attribute Details
#credentials ⇒ Object
Returns the value of attribute credentials.
28 29 30 |
# File 'lib/gcloud/pubsub/service.rb', line 28 def credentials @credentials end |
#host ⇒ Object
Returns the value of attribute host.
28 29 30 |
# File 'lib/gcloud/pubsub/service.rb', line 28 def host @host end |
#mocked_iam ⇒ Object
Returns the value of attribute mocked_iam.
59 60 61 |
# File 'lib/gcloud/pubsub/service.rb', line 59 def mocked_iam @mocked_iam end |
#mocked_publisher ⇒ Object
Returns the value of attribute mocked_publisher.
53 54 55 |
# File 'lib/gcloud/pubsub/service.rb', line 53 def mocked_publisher @mocked_publisher end |
#mocked_subscriber ⇒ Object
Returns the value of attribute mocked_subscriber.
47 48 49 |
# File 'lib/gcloud/pubsub/service.rb', line 47 def mocked_subscriber @mocked_subscriber end |
#project ⇒ Object
Returns the value of attribute project.
28 29 30 |
# File 'lib/gcloud/pubsub/service.rb', line 28 def project @project end |
Instance Method Details
#acknowledge(subscription, *ack_ids) ⇒ Object
Acknowledges receipt of a message.
204 205 206 207 208 209 210 211 |
# File 'lib/gcloud/pubsub/service.rb', line 204 def acknowledge subscription, *ack_ids ack_req = Google::Pubsub::V1::AcknowledgeRequest.new( subscription: subscription_path(subscription), ack_ids: ack_ids ) backoff { subscriber.acknowledge ack_req } end |
#create_subscription(topic, subscription_name, options = {}) ⇒ Object
Creates a subscription on a given topic for a given subscriber.
164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/gcloud/pubsub/service.rb', line 164 def create_subscription topic, subscription_name, = {} sub_params = { name: subscription_path(subscription_name, ), topic: topic_path(topic), ack_deadline_seconds: [:deadline] }.delete_if { |_, v| v.nil? } sub_req = Google::Pubsub::V1::Subscription.new sub_params if [:endpoint] sub_req.push_config = Google::Pubsub::V1::PushConfig.new( push_endpoint: [:endpoint], attributes: ([:attributes] || {}).to_h) end backoff { subscriber.create_subscription sub_req } end |
#create_topic(topic_name, options = {}) ⇒ Object
Creates the given topic with the given name.
89 90 91 92 93 94 95 |
# File 'lib/gcloud/pubsub/service.rb', line 89 def create_topic topic_name, = {} topic_req = Google::Pubsub::V1::Topic.new.tap do |r| r.name = topic_path(topic_name, ) end backoff { publisher.create_topic topic_req } end |
#creds ⇒ Object
38 39 40 41 |
# File 'lib/gcloud/pubsub/service.rb', line 38 def creds GRPC::Core::ChannelCredentials.new.compose \ GRPC::Core::CallCredentials.new credentials.client.updater_proc end |
#delete_subscription(subscription) ⇒ Object
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
182 183 184 185 186 187 188 |
# File 'lib/gcloud/pubsub/service.rb', line 182 def delete_subscription subscription del_req = Google::Pubsub::V1::DeleteSubscriptionRequest.new( subscription: subscription_path(subscription) ) backoff { subscriber.delete_subscription del_req } end |
#delete_topic(topic_name) ⇒ Object
Deletes the topic with the given name. All subscriptions to this topic are also deleted. Raises GRPC status code 5 if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.
102 103 104 105 106 107 108 |
# File 'lib/gcloud/pubsub/service.rb', line 102 def delete_topic topic_name topic_req = Google::Pubsub::V1::DeleteTopicRequest.new.tap do |r| r.topic = topic_path(topic_name) end backoff { publisher.delete_topic topic_req } end |
#get_subscription(subscription_name, options = {}) ⇒ Object
Gets the details of a subscription.
131 132 133 134 135 136 137 |
# File 'lib/gcloud/pubsub/service.rb', line 131 def get_subscription subscription_name, = {} sub_req = Google::Pubsub::V1::GetSubscriptionRequest.new( subscription: subscription_path(subscription_name, ) ) backoff { subscriber.get_subscription sub_req } end |
#get_subscription_policy(subscription_name, options = {}) ⇒ Object
268 269 270 271 272 273 274 |
# File 'lib/gcloud/pubsub/service.rb', line 268 def get_subscription_policy subscription_name, = {} get_req = Google::Iam::V1::GetIamPolicyRequest.new( resource: subscription_path(subscription_name, ) ) backoff { iam.get_iam_policy get_req } end |
#get_topic(topic_name, options = {}) ⇒ Object
Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.
67 68 69 70 71 72 73 |
# File 'lib/gcloud/pubsub/service.rb', line 67 def get_topic topic_name, = {} topic_req = Google::Pubsub::V1::GetTopicRequest.new.tap do |r| r.topic = topic_path(topic_name, ) end backoff { publisher.get_topic topic_req } end |
#get_topic_policy(topic_name, options = {}) ⇒ Object
242 243 244 245 246 247 248 |
# File 'lib/gcloud/pubsub/service.rb', line 242 def get_topic_policy topic_name, = {} get_req = Google::Iam::V1::GetIamPolicyRequest.new( resource: topic_path(topic_name, ) ) backoff { iam.get_iam_policy get_req } end |
#iam ⇒ Object
55 56 57 58 |
# File 'lib/gcloud/pubsub/service.rb', line 55 def iam return mocked_iam if mocked_iam @iam ||= Google::Iam::V1::IAMPolicy::Stub.new host, creds end |
#inspect ⇒ Object
310 311 312 |
# File 'lib/gcloud/pubsub/service.rb', line 310 def inspect "#{self.class}(#{@project})" end |
#list_subscriptions(options = {}) ⇒ Object
Lists matching subscriptions by project.
153 154 155 156 157 158 159 160 |
# File 'lib/gcloud/pubsub/service.rb', line 153 def list_subscriptions = {} list_params = { project: project_path(), page_token: [:token], page_size: [:max] }.delete_if { |_, v| v.nil? } list_req = Google::Pubsub::V1::ListSubscriptionsRequest.new list_params backoff { subscriber.list_subscriptions list_req } end |
#list_topics(options = {}) ⇒ Object
Lists matching topics.
77 78 79 80 81 82 83 84 85 |
# File 'lib/gcloud/pubsub/service.rb', line 77 def list_topics = {} topics_req = Google::Pubsub::V1::ListTopicsRequest.new.tap do |r| r.project = project_path() r.page_token = [:token] if [:token] r.page_size = [:max] if [:max] end backoff { publisher.list_topics topics_req } end |
#list_topics_subscriptions(topic, options = {}) ⇒ Object
Lists matching subscriptions by project and topic.
141 142 143 144 145 146 147 148 149 |
# File 'lib/gcloud/pubsub/service.rb', line 141 def list_topics_subscriptions topic, = {} list_params = { topic: topic_path(topic, ), page_token: [:token], page_size: [:max] }.delete_if { |_, v| v.nil? } list_req = Google::Pubsub::V1::ListTopicSubscriptionsRequest.new \ list_params backoff { publisher.list_topic_subscriptions list_req } end |
#modify_ack_deadline(subscription, ids, deadline) ⇒ Object
Modifies the ack deadline for a specific message.
232 233 234 235 236 237 238 239 240 |
# File 'lib/gcloud/pubsub/service.rb', line 232 def modify_ack_deadline subscription, ids, deadline mad_req = Google::Pubsub::V1::ModifyAckDeadlineRequest.new( subscription: subscription_path(subscription), ack_ids: Array(ids), ack_deadline_seconds: deadline ) backoff { subscriber.modify_ack_deadline mad_req } end |
#modify_push_config(subscription, endpoint, attributes) ⇒ Object
Modifies the PushConfig for a specified subscription.
215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/gcloud/pubsub/service.rb', line 215 def modify_push_config subscription, endpoint, attributes # Convert attributes to strings to match the protobuf definition attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }] mpc_req = Google::Pubsub::V1::ModifyPushConfigRequest.new( subscription: subscription_path(subscription), push_config: Google::Pubsub::V1::PushConfig.new( push_endpoint: endpoint, attributes: attributes ) ) backoff { subscriber.modify_push_config mpc_req } end |
#project_path(options = {}) ⇒ Object
295 296 297 298 |
# File 'lib/gcloud/pubsub/service.rb', line 295 def project_path = {} project_name = [:project] || project "projects/#{project_name}" end |
#publish(topic, messages) ⇒ Object
Adds one or more messages to the topic. Raises GRPC status code 5 if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.
115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/gcloud/pubsub/service.rb', line 115 def publish topic, publish_req = Google::Pubsub::V1::PublishRequest.new( topic: topic_path(topic), messages: .map do |data, attributes| Google::Pubsub::V1::PubsubMessage.new( data: String(data).encode("ASCII-8BIT"), attributes: attributes ) end ) backoff { publisher.publish publish_req } end |
#publisher ⇒ Object
49 50 51 52 |
# File 'lib/gcloud/pubsub/service.rb', line 49 def publisher return mocked_publisher if mocked_publisher @publisher ||= Google::Pubsub::V1::Publisher::Stub.new host, creds end |
#pull(subscription, options = {}) ⇒ Object
Pulls a single message from the server.
192 193 194 195 196 197 198 199 200 |
# File 'lib/gcloud/pubsub/service.rb', line 192 def pull subscription, = {} pull_req = Google::Pubsub::V1::PullRequest.new( subscription: subscription_path(subscription, ), return_immediately: !(!.fetch(:immediate, true)), max_messages: .fetch(:max, 100).to_i ) backoff { subscriber.pull pull_req } end |
#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object
276 277 278 279 280 281 282 283 |
# File 'lib/gcloud/pubsub/service.rb', line 276 def set_subscription_policy subscription_name, new_policy, = {} set_req = Google::Iam::V1::SetIamPolicyRequest.new( resource: subscription_path(subscription_name, ), policy: Google::Iam::V1::Policy.decode_json(JSON.dump(new_policy)) ) backoff { iam.set_iam_policy set_req } end |
#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object
250 251 252 253 254 255 256 257 |
# File 'lib/gcloud/pubsub/service.rb', line 250 def set_topic_policy topic_name, new_policy, = {} set_req = Google::Iam::V1::SetIamPolicyRequest.new( resource: topic_path(topic_name, ), policy: Google::Iam::V1::Policy.decode_json(JSON.dump(new_policy)) ) backoff { iam.set_iam_policy set_req } end |
#subscriber ⇒ Object
43 44 45 46 |
# File 'lib/gcloud/pubsub/service.rb', line 43 def subscriber return mocked_subscriber if mocked_subscriber @subscriber ||= Google::Pubsub::V1::Subscriber::Stub.new host, creds end |
#subscription_path(subscription_name, options = {}) ⇒ Object
305 306 307 308 |
# File 'lib/gcloud/pubsub/service.rb', line 305 def subscription_path subscription_name, = {} return subscription_name if subscription_name.to_s.include? "/" "#{project_path(options)}/subscriptions/#{subscription_name}" end |
#test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object
285 286 287 288 289 290 291 292 293 |
# File 'lib/gcloud/pubsub/service.rb', line 285 def subscription_name, , = {} test_req = Google::Iam::V1::TestIamPermissionsRequest.new( resource: subscription_path(subscription_name, ), permissions: ) backoff { iam. test_req } end |
#test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object
259 260 261 262 263 264 265 266 |
# File 'lib/gcloud/pubsub/service.rb', line 259 def topic_name, , = {} test_req = Google::Iam::V1::TestIamPermissionsRequest.new( resource: topic_path(topic_name, ), permissions: ) backoff { iam. test_req } end |
#topic_path(topic_name, options = {}) ⇒ Object
300 301 302 303 |
# File 'lib/gcloud/pubsub/service.rb', line 300 def topic_path topic_name, = {} return topic_name if topic_name.to_s.include? "/" "#{project_path(options)}/topics/#{topic_name}" end |