Class: Gcloud::Pubsub::Service

Inherits:
Object
  • Object
show all
Defined in:
lib/gcloud/pubsub/service.rb

Overview

methods.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(project, credentials, host: nil, retries: nil, timeout: nil) ⇒ Service

Creates a new Service instance.



33
34
35
36
37
38
39
# File 'lib/gcloud/pubsub/service.rb', line 33

def initialize project, credentials, host: nil, retries: nil, timeout: nil
  @project = project
  @credentials = credentials
  @host = host || "pubsub.googleapis.com"
  @retries = retries
  @timeout = timeout
end

Instance Attribute Details

#credentialsObject

Returns the value of attribute credentials.



29
30
31
# File 'lib/gcloud/pubsub/service.rb', line 29

def credentials
  @credentials
end

#hostObject

Returns the value of attribute host.



29
30
31
# File 'lib/gcloud/pubsub/service.rb', line 29

def host
  @host
end

#mocked_iamObject

Returns the value of attribute mocked_iam.



66
67
68
# File 'lib/gcloud/pubsub/service.rb', line 66

def mocked_iam
  @mocked_iam
end

#mocked_publisherObject

Returns the value of attribute mocked_publisher.



59
60
61
# File 'lib/gcloud/pubsub/service.rb', line 59

def mocked_publisher
  @mocked_publisher
end

#mocked_subscriberObject

Returns the value of attribute mocked_subscriber.



52
53
54
# File 'lib/gcloud/pubsub/service.rb', line 52

def mocked_subscriber
  @mocked_subscriber
end

#projectObject

Returns the value of attribute project.



29
30
31
# File 'lib/gcloud/pubsub/service.rb', line 29

def project
  @project
end

#retriesObject

Returns the value of attribute retries.



29
30
31
# File 'lib/gcloud/pubsub/service.rb', line 29

def retries
  @retries
end

#timeoutObject

Returns the value of attribute timeout.



29
30
31
# File 'lib/gcloud/pubsub/service.rb', line 29

def timeout
  @timeout
end

Instance Method Details

#acknowledge(subscription, *ack_ids) ⇒ Object

Acknowledges receipt of a message.



213
214
215
216
217
218
219
220
# File 'lib/gcloud/pubsub/service.rb', line 213

def acknowledge subscription, *ack_ids
  ack_req = Google::Pubsub::V1::AcknowledgeRequest.new(
    subscription: subscription_path(subscription),
    ack_ids: ack_ids
  )

  execute { subscriber.acknowledge ack_req }
end

#create_subscription(topic, subscription_name, options = {}) ⇒ Object

Creates a subscription on a given topic for a given subscriber.



173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/gcloud/pubsub/service.rb', line 173

def create_subscription topic, subscription_name, options = {}
  sub_params = { name: subscription_path(subscription_name, options),
                 topic: topic_path(topic),
                 ack_deadline_seconds: options[:deadline]
               }.delete_if { |_, v| v.nil? }
  sub_req = Google::Pubsub::V1::Subscription.new sub_params
  if options[:endpoint]
    sub_req.push_config = Google::Pubsub::V1::PushConfig.new(
      push_endpoint: options[:endpoint],
      attributes: (options[:attributes] || {}).to_h)
  end

  execute { subscriber.create_subscription sub_req }
end

#create_topic(topic_name, options = {}) ⇒ Object

Creates the given topic with the given name.



100
101
102
103
104
105
106
# File 'lib/gcloud/pubsub/service.rb', line 100

def create_topic topic_name, options = {}
  topic_req = Google::Pubsub::V1::Topic.new.tap do |r|
    r.name = topic_path(topic_name, options)
  end

  execute { publisher.create_topic topic_req }
end

#credsObject



41
42
43
44
45
# File 'lib/gcloud/pubsub/service.rb', line 41

def creds
  return credentials if insecure?
  GRPC::Core::ChannelCredentials.new.compose \
    GRPC::Core::CallCredentials.new credentials.client.updater_proc
end

#delete_subscription(subscription) ⇒ Object

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.



191
192
193
194
195
196
197
# File 'lib/gcloud/pubsub/service.rb', line 191

def delete_subscription subscription
  del_req = Google::Pubsub::V1::DeleteSubscriptionRequest.new(
    subscription: subscription_path(subscription)
  )

  execute { subscriber.delete_subscription del_req }
end

#delete_topic(topic_name) ⇒ Object

Deletes the topic with the given name. All subscriptions to this topic are also deleted. Raises GRPC status code 5 if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.



113
114
115
116
117
118
119
# File 'lib/gcloud/pubsub/service.rb', line 113

def delete_topic topic_name
  topic_req = Google::Pubsub::V1::DeleteTopicRequest.new.tap do |r|
    r.topic = topic_path(topic_name)
  end

  execute { publisher.delete_topic topic_req }
end

#get_subscription(subscription_name, options = {}) ⇒ Object

Gets the details of a subscription.



140
141
142
143
144
145
146
# File 'lib/gcloud/pubsub/service.rb', line 140

def get_subscription subscription_name, options = {}
  sub_req = Google::Pubsub::V1::GetSubscriptionRequest.new(
    subscription: subscription_path(subscription_name, options)
  )

  execute { subscriber.get_subscription sub_req }
end

#get_subscription_policy(subscription_name, options = {}) ⇒ Object



277
278
279
280
281
282
283
# File 'lib/gcloud/pubsub/service.rb', line 277

def get_subscription_policy subscription_name, options = {}
  get_req = Google::Iam::V1::GetIamPolicyRequest.new(
    resource: subscription_path(subscription_name, options)
  )

  execute { iam.get_iam_policy get_req }
end

#get_topic(topic_name, options = {}) ⇒ Object

Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.



78
79
80
81
82
83
84
# File 'lib/gcloud/pubsub/service.rb', line 78

def get_topic topic_name, options = {}
  topic_req = Google::Pubsub::V1::GetTopicRequest.new.tap do |r|
    r.topic = topic_path(topic_name, options)
  end

  execute { publisher.get_topic topic_req }
end

#get_topic_policy(topic_name, options = {}) ⇒ Object



251
252
253
254
255
256
257
# File 'lib/gcloud/pubsub/service.rb', line 251

def get_topic_policy topic_name, options = {}
  get_req = Google::Iam::V1::GetIamPolicyRequest.new(
    resource: topic_path(topic_name, options)
  )

  execute { iam.get_iam_policy get_req }
end

#iamObject



61
62
63
64
65
# File 'lib/gcloud/pubsub/service.rb', line 61

def iam
  return mocked_iam if mocked_iam
  @iam ||= Google::Iam::V1::IAMPolicy::Stub.new(
    host, creds, timeout: timeout)
end

#insecure?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/gcloud/pubsub/service.rb', line 68

def insecure?
  credentials == :this_channel_is_insecure
end

#inspectObject



319
320
321
# File 'lib/gcloud/pubsub/service.rb', line 319

def inspect
  "#{self.class}(#{@project})"
end

#list_subscriptions(options = {}) ⇒ Object

Lists matching subscriptions by project.



162
163
164
165
166
167
168
169
# File 'lib/gcloud/pubsub/service.rb', line 162

def list_subscriptions options = {}
  list_params = { project:    project_path(options),
                  page_token: options[:token],
                  page_size:  options[:max] }.delete_if { |_, v| v.nil? }
  list_req = Google::Pubsub::V1::ListSubscriptionsRequest.new list_params

  execute { subscriber.list_subscriptions list_req }
end

#list_topics(options = {}) ⇒ Object

Lists matching topics.



88
89
90
91
92
93
94
95
96
# File 'lib/gcloud/pubsub/service.rb', line 88

def list_topics options = {}
  topics_req = Google::Pubsub::V1::ListTopicsRequest.new.tap do |r|
    r.project = project_path(options)
    r.page_token = options[:token] if options[:token]
    r.page_size = options[:max] if options[:max]
  end

  execute { publisher.list_topics topics_req }
end

#list_topics_subscriptions(topic, options = {}) ⇒ Object

Lists matching subscriptions by project and topic.



150
151
152
153
154
155
156
157
158
# File 'lib/gcloud/pubsub/service.rb', line 150

def list_topics_subscriptions topic, options = {}
  list_params = { topic:     topic_path(topic, options),
                  page_token: options[:token],
                  page_size:  options[:max] }.delete_if { |_, v| v.nil? }
  list_req = Google::Pubsub::V1::ListTopicSubscriptionsRequest.new \
    list_params

  execute { publisher.list_topic_subscriptions list_req }
end

#modify_ack_deadline(subscription, ids, deadline) ⇒ Object

Modifies the ack deadline for a specific message.



241
242
243
244
245
246
247
248
249
# File 'lib/gcloud/pubsub/service.rb', line 241

def modify_ack_deadline subscription, ids, deadline
  mad_req = Google::Pubsub::V1::ModifyAckDeadlineRequest.new(
    subscription: subscription_path(subscription),
    ack_ids: Array(ids),
    ack_deadline_seconds: deadline
  )

  execute { subscriber.modify_ack_deadline mad_req }
end

#modify_push_config(subscription, endpoint, attributes) ⇒ Object

Modifies the PushConfig for a specified subscription.



224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/gcloud/pubsub/service.rb', line 224

def modify_push_config subscription, endpoint, attributes
  # Convert attributes to strings to match the protobuf definition
  attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }]

  mpc_req = Google::Pubsub::V1::ModifyPushConfigRequest.new(
    subscription: subscription_path(subscription),
    push_config: Google::Pubsub::V1::PushConfig.new(
      push_endpoint: endpoint,
      attributes: attributes
    )
  )

  execute { subscriber.modify_push_config mpc_req }
end

#project_path(options = {}) ⇒ Object



304
305
306
307
# File 'lib/gcloud/pubsub/service.rb', line 304

def project_path options = {}
  project_name = options[:project] || project
  "projects/#{project_name}"
end

#publish(topic, messages) ⇒ Object

Adds one or more messages to the topic. Raises GRPC status code 5 if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.



126
127
128
129
130
131
132
133
134
135
136
# File 'lib/gcloud/pubsub/service.rb', line 126

def publish topic, messages
  publish_req = Google::Pubsub::V1::PublishRequest.new(
    topic: topic_path(topic),
    messages: messages.map do |data, attributes|
      Google::Pubsub::V1::PubsubMessage.new(
        data: data, attributes: attributes)
    end
  )

  execute { publisher.publish publish_req }
end

#publisherObject



54
55
56
57
58
# File 'lib/gcloud/pubsub/service.rb', line 54

def publisher
  return mocked_publisher if mocked_publisher
  @publisher ||= Google::Pubsub::V1::Publisher::Stub.new(
    host, creds, timeout: timeout)
end

#pull(subscription, options = {}) ⇒ Object

Pulls a single message from the server.



201
202
203
204
205
206
207
208
209
# File 'lib/gcloud/pubsub/service.rb', line 201

def pull subscription, options = {}
  pull_req = Google::Pubsub::V1::PullRequest.new(
    subscription: subscription_path(subscription, options),
    return_immediately: !(!options.fetch(:immediate, true)),
    max_messages: options.fetch(:max, 100).to_i
  )

  execute { subscriber.pull pull_req }
end

#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object



285
286
287
288
289
290
291
292
# File 'lib/gcloud/pubsub/service.rb', line 285

def set_subscription_policy subscription_name, new_policy, options = {}
  set_req = Google::Iam::V1::SetIamPolicyRequest.new(
    resource: subscription_path(subscription_name, options),
    policy: new_policy
  )

  execute { iam.set_iam_policy set_req }
end

#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object



259
260
261
262
263
264
265
266
# File 'lib/gcloud/pubsub/service.rb', line 259

def set_topic_policy topic_name, new_policy, options = {}
  set_req = Google::Iam::V1::SetIamPolicyRequest.new(
    resource: topic_path(topic_name, options),
    policy: new_policy
  )

  execute { iam.set_iam_policy set_req }
end

#subscriberObject



47
48
49
50
51
# File 'lib/gcloud/pubsub/service.rb', line 47

def subscriber
  return mocked_subscriber if mocked_subscriber
  @subscriber ||= Google::Pubsub::V1::Subscriber::Stub.new(
    host, creds, timeout: timeout)
end

#subscription_path(subscription_name, options = {}) ⇒ Object



314
315
316
317
# File 'lib/gcloud/pubsub/service.rb', line 314

def subscription_path subscription_name, options = {}
  return subscription_name if subscription_name.to_s.include? "/"
  "#{project_path(options)}/subscriptions/#{subscription_name}"
end

#test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object



294
295
296
297
298
299
300
301
302
# File 'lib/gcloud/pubsub/service.rb', line 294

def test_subscription_permissions subscription_name,
                                  permissions, options = {}
  test_req = Google::Iam::V1::TestIamPermissionsRequest.new(
    resource: subscription_path(subscription_name, options),
    permissions: permissions
  )

  execute { iam.test_iam_permissions test_req }
end

#test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object



268
269
270
271
272
273
274
275
# File 'lib/gcloud/pubsub/service.rb', line 268

def test_topic_permissions topic_name, permissions, options = {}
  test_req = Google::Iam::V1::TestIamPermissionsRequest.new(
    resource: topic_path(topic_name, options),
    permissions: permissions
  )

  execute { iam.test_iam_permissions test_req }
end

#topic_path(topic_name, options = {}) ⇒ Object



309
310
311
312
# File 'lib/gcloud/pubsub/service.rb', line 309

def topic_path topic_name, options = {}
  return topic_name if topic_name.to_s.include? "/"
  "#{project_path(options)}/topics/#{topic_name}"
end