Class: Google::Cloud::Pubsub::Service

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/service.rb

Overview

methods.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(project, credentials, host: nil, retries: nil, timeout: nil) ⇒ Service

Creates a new Service instance.



34
35
36
37
38
39
40
41
# File 'lib/google/cloud/pubsub/service.rb', line 34

def initialize project, credentials, host: nil, retries: nil,
               timeout: nil
  @project = project
  @credentials = credentials
  @host = host || "pubsub.googleapis.com"
  @retries = retries
  @timeout = timeout
end

Instance Attribute Details

#credentialsObject

Returns the value of attribute credentials.



30
31
32
# File 'lib/google/cloud/pubsub/service.rb', line 30

def credentials
  @credentials
end

#hostObject

Returns the value of attribute host.



30
31
32
# File 'lib/google/cloud/pubsub/service.rb', line 30

def host
  @host
end

#mocked_iamObject

Returns the value of attribute mocked_iam.



68
69
70
# File 'lib/google/cloud/pubsub/service.rb', line 68

def mocked_iam
  @mocked_iam
end

#mocked_publisherObject

Returns the value of attribute mocked_publisher.



61
62
63
# File 'lib/google/cloud/pubsub/service.rb', line 61

def mocked_publisher
  @mocked_publisher
end

#mocked_subscriberObject

Returns the value of attribute mocked_subscriber.



54
55
56
# File 'lib/google/cloud/pubsub/service.rb', line 54

def mocked_subscriber
  @mocked_subscriber
end

#projectObject

Returns the value of attribute project.



30
31
32
# File 'lib/google/cloud/pubsub/service.rb', line 30

def project
  @project
end

#retriesObject

Returns the value of attribute retries.



30
31
32
# File 'lib/google/cloud/pubsub/service.rb', line 30

def retries
  @retries
end

#timeoutObject

Returns the value of attribute timeout.



30
31
32
# File 'lib/google/cloud/pubsub/service.rb', line 30

def timeout
  @timeout
end

Instance Method Details

#acknowledge(subscription, *ack_ids) ⇒ Object

Acknowledges receipt of a message.



218
219
220
221
222
223
224
225
# File 'lib/google/cloud/pubsub/service.rb', line 218

def acknowledge subscription, *ack_ids
  ack_req = Google::Pubsub::V1::AcknowledgeRequest.new(
    subscription: subscription_path(subscription),
    ack_ids: ack_ids
  )

  execute { subscriber.acknowledge ack_req }
end

#create_subscription(topic, subscription_name, options = {}) ⇒ Object

Creates a subscription on a given topic for a given subscriber.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/google/cloud/pubsub/service.rb', line 178

def create_subscription topic, subscription_name, options = {}
  sub_params = { name: subscription_path(subscription_name, options),
                 topic: topic_path(topic),
                 ack_deadline_seconds: options[:deadline]
               }.delete_if { |_, v| v.nil? }
  sub_req = Google::Pubsub::V1::Subscription.new sub_params
  if options[:endpoint]
    sub_req.push_config = Google::Pubsub::V1::PushConfig.new(
      push_endpoint: options[:endpoint],
      attributes: (options[:attributes] || {}).to_h)
  end

  execute { subscriber.create_subscription sub_req }
end

#create_topic(topic_name, options = {}) ⇒ Object

Creates the given topic with the given name.



102
103
104
105
106
107
108
# File 'lib/google/cloud/pubsub/service.rb', line 102

def create_topic topic_name, options = {}
  topic_req = Google::Pubsub::V1::Topic.new.tap do |r|
    r.name = topic_path(topic_name, options)
  end

  execute { publisher.create_topic topic_req }
end

#credsObject



43
44
45
46
47
# File 'lib/google/cloud/pubsub/service.rb', line 43

def creds
  return credentials if insecure?
  GRPC::Core::ChannelCredentials.new.compose \
    GRPC::Core::CallCredentials.new credentials.client.updater_proc
end

#delete_subscription(subscription) ⇒ Object

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.



196
197
198
199
200
201
202
# File 'lib/google/cloud/pubsub/service.rb', line 196

def delete_subscription subscription
  del_req = Google::Pubsub::V1::DeleteSubscriptionRequest.new(
    subscription: subscription_path(subscription)
  )

  execute { subscriber.delete_subscription del_req }
end

#delete_topic(topic_name) ⇒ Object

Deletes the topic with the given name. All subscriptions to this topic are also deleted. Raises GRPC status code 5 if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.



115
116
117
118
119
120
121
# File 'lib/google/cloud/pubsub/service.rb', line 115

def delete_topic topic_name
  topic_req = Google::Pubsub::V1::DeleteTopicRequest.new.tap do |r|
    r.topic = topic_path(topic_name)
  end

  execute { publisher.delete_topic topic_req }
end

#get_subscription(subscription_name, options = {}) ⇒ Object

Gets the details of a subscription.



142
143
144
145
146
147
148
# File 'lib/google/cloud/pubsub/service.rb', line 142

def get_subscription subscription_name, options = {}
  sub_req = Google::Pubsub::V1::GetSubscriptionRequest.new(
    subscription: subscription_path(subscription_name, options)
  )

  execute { subscriber.get_subscription sub_req }
end

#get_subscription_policy(subscription_name, options = {}) ⇒ Object



282
283
284
285
286
287
288
# File 'lib/google/cloud/pubsub/service.rb', line 282

def get_subscription_policy subscription_name, options = {}
  get_req = Google::Iam::V1::GetIamPolicyRequest.new(
    resource: subscription_path(subscription_name, options)
  )

  execute { iam.get_iam_policy get_req }
end

#get_topic(topic_name, options = {}) ⇒ Object

Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.



80
81
82
83
84
85
86
# File 'lib/google/cloud/pubsub/service.rb', line 80

def get_topic topic_name, options = {}
  topic_req = Google::Pubsub::V1::GetTopicRequest.new.tap do |r|
    r.topic = topic_path(topic_name, options)
  end

  execute { publisher.get_topic topic_req }
end

#get_topic_policy(topic_name, options = {}) ⇒ Object



256
257
258
259
260
261
262
# File 'lib/google/cloud/pubsub/service.rb', line 256

def get_topic_policy topic_name, options = {}
  get_req = Google::Iam::V1::GetIamPolicyRequest.new(
    resource: topic_path(topic_name, options)
  )

  execute { iam.get_iam_policy get_req }
end

#iamObject



63
64
65
66
67
# File 'lib/google/cloud/pubsub/service.rb', line 63

def iam
  return mocked_iam if mocked_iam
  @iam ||= Google::Iam::V1::IAMPolicy::Stub.new(
    host, creds, timeout: timeout)
end

#insecure?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/google/cloud/pubsub/service.rb', line 70

def insecure?
  credentials == :this_channel_is_insecure
end

#inspectObject



324
325
326
# File 'lib/google/cloud/pubsub/service.rb', line 324

def inspect
  "#{self.class}(#{@project})"
end

#list_subscriptions(options = {}) ⇒ Object

Lists matching subscriptions by project.



165
166
167
168
169
170
171
172
173
174
# File 'lib/google/cloud/pubsub/service.rb', line 165

def list_subscriptions options = {}
  list_params = { project:    project_path(options),
                  page_token: options[:token],
                  page_size:  options[:max]
                }.delete_if { |_, v| v.nil? }
  list_req = Google::Pubsub::V1::ListSubscriptionsRequest.new(
    list_params)

  execute { subscriber.list_subscriptions list_req }
end

#list_topics(options = {}) ⇒ Object

Lists matching topics.



90
91
92
93
94
95
96
97
98
# File 'lib/google/cloud/pubsub/service.rb', line 90

def list_topics options = {}
  topics_req = Google::Pubsub::V1::ListTopicsRequest.new.tap do |r|
    r.project = project_path(options)
    r.page_token = options[:token] if options[:token]
    r.page_size = options[:max] if options[:max]
  end

  execute { publisher.list_topics topics_req }
end

#list_topics_subscriptions(topic, options = {}) ⇒ Object

Lists matching subscriptions by project and topic.



152
153
154
155
156
157
158
159
160
161
# File 'lib/google/cloud/pubsub/service.rb', line 152

def list_topics_subscriptions topic, options = {}
  list_params = { topic:     topic_path(topic, options),
                  page_token: options[:token],
                  page_size:  options[:max]
                }.delete_if { |_, v| v.nil? }
  list_req = Google::Pubsub::V1::ListTopicSubscriptionsRequest.new \
    list_params

  execute { publisher.list_topic_subscriptions list_req }
end

#modify_ack_deadline(subscription, ids, deadline) ⇒ Object

Modifies the ack deadline for a specific message.



246
247
248
249
250
251
252
253
254
# File 'lib/google/cloud/pubsub/service.rb', line 246

def modify_ack_deadline subscription, ids, deadline
  mad_req = Google::Pubsub::V1::ModifyAckDeadlineRequest.new(
    subscription: subscription_path(subscription),
    ack_ids: Array(ids),
    ack_deadline_seconds: deadline
  )

  execute { subscriber.modify_ack_deadline mad_req }
end

#modify_push_config(subscription, endpoint, attributes) ⇒ Object

Modifies the PushConfig for a specified subscription.



229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/google/cloud/pubsub/service.rb', line 229

def modify_push_config subscription, endpoint, attributes
  # Convert attributes to strings to match the protobuf definition
  attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }]

  mpc_req = Google::Pubsub::V1::ModifyPushConfigRequest.new(
    subscription: subscription_path(subscription),
    push_config: Google::Pubsub::V1::PushConfig.new(
      push_endpoint: endpoint,
      attributes: attributes
    )
  )

  execute { subscriber.modify_push_config mpc_req }
end

#project_path(options = {}) ⇒ Object



309
310
311
312
# File 'lib/google/cloud/pubsub/service.rb', line 309

def project_path options = {}
  project_name = options[:project] || project
  "projects/#{project_name}"
end

#publish(topic, messages) ⇒ Object

Adds one or more messages to the topic. Raises GRPC status code 5 if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.



128
129
130
131
132
133
134
135
136
137
138
# File 'lib/google/cloud/pubsub/service.rb', line 128

def publish topic, messages
  publish_req = Google::Pubsub::V1::PublishRequest.new(
    topic: topic_path(topic),
    messages: messages.map do |data, attributes|
      Google::Pubsub::V1::PubsubMessage.new(
        data: data, attributes: attributes)
    end
  )

  execute { publisher.publish publish_req }
end

#publisherObject



56
57
58
59
60
# File 'lib/google/cloud/pubsub/service.rb', line 56

def publisher
  return mocked_publisher if mocked_publisher
  @publisher ||= Google::Pubsub::V1::Publisher::Stub.new(
    host, creds, timeout: timeout)
end

#pull(subscription, options = {}) ⇒ Object

Pulls a single message from the server.



206
207
208
209
210
211
212
213
214
# File 'lib/google/cloud/pubsub/service.rb', line 206

def pull subscription, options = {}
  pull_req = Google::Pubsub::V1::PullRequest.new(
    subscription: subscription_path(subscription, options),
    return_immediately: !(!options.fetch(:immediate, true)),
    max_messages: options.fetch(:max, 100).to_i
  )

  execute { subscriber.pull pull_req }
end

#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object



290
291
292
293
294
295
296
297
# File 'lib/google/cloud/pubsub/service.rb', line 290

def set_subscription_policy subscription_name, new_policy, options = {}
  set_req = Google::Iam::V1::SetIamPolicyRequest.new(
    resource: subscription_path(subscription_name, options),
    policy: new_policy
  )

  execute { iam.set_iam_policy set_req }
end

#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object



264
265
266
267
268
269
270
271
# File 'lib/google/cloud/pubsub/service.rb', line 264

def set_topic_policy topic_name, new_policy, options = {}
  set_req = Google::Iam::V1::SetIamPolicyRequest.new(
    resource: topic_path(topic_name, options),
    policy: new_policy
  )

  execute { iam.set_iam_policy set_req }
end

#subscriberObject



49
50
51
52
53
# File 'lib/google/cloud/pubsub/service.rb', line 49

def subscriber
  return mocked_subscriber if mocked_subscriber
  @subscriber ||= Google::Pubsub::V1::Subscriber::Stub.new(
    host, creds, timeout: timeout)
end

#subscription_path(subscription_name, options = {}) ⇒ Object



319
320
321
322
# File 'lib/google/cloud/pubsub/service.rb', line 319

def subscription_path subscription_name, options = {}
  return subscription_name if subscription_name.to_s.include? "/"
  "#{project_path(options)}/subscriptions/#{subscription_name}"
end

#test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object



299
300
301
302
303
304
305
306
307
# File 'lib/google/cloud/pubsub/service.rb', line 299

def test_subscription_permissions subscription_name,
                                  permissions, options = {}
  test_req = Google::Iam::V1::TestIamPermissionsRequest.new(
    resource: subscription_path(subscription_name, options),
    permissions: permissions
  )

  execute { iam.test_iam_permissions test_req }
end

#test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object



273
274
275
276
277
278
279
280
# File 'lib/google/cloud/pubsub/service.rb', line 273

def test_topic_permissions topic_name, permissions, options = {}
  test_req = Google::Iam::V1::TestIamPermissionsRequest.new(
    resource: topic_path(topic_name, options),
    permissions: permissions
  )

  execute { iam.test_iam_permissions test_req }
end

#topic_path(topic_name, options = {}) ⇒ Object



314
315
316
317
# File 'lib/google/cloud/pubsub/service.rb', line 314

def topic_path topic_name, options = {}
  return topic_name if topic_name.to_s.include? "/"
  "#{project_path(options)}/topics/#{topic_name}"
end