Class: Gcloud::Pubsub::Service

Inherits:
Object
  • Object
show all
Defined in:
lib/gcloud/pubsub/service.rb

Overview

methods.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(project, credentials) ⇒ Service

Creates a new Service instance.



32
33
34
35
36
# File 'lib/gcloud/pubsub/service.rb', line 32

def initialize project, credentials
  @project = project
  @credentials = credentials
  @host = "pubsub.googleapis.com"
end

Instance Attribute Details

#credentialsObject

Returns the value of attribute credentials.



28
29
30
# File 'lib/gcloud/pubsub/service.rb', line 28

def credentials
  @credentials
end

#hostObject

Returns the value of attribute host.



28
29
30
# File 'lib/gcloud/pubsub/service.rb', line 28

def host
  @host
end

#mocked_iamObject

Returns the value of attribute mocked_iam.



60
61
62
# File 'lib/gcloud/pubsub/service.rb', line 60

def mocked_iam
  @mocked_iam
end

#mocked_publisherObject

Returns the value of attribute mocked_publisher.



54
55
56
# File 'lib/gcloud/pubsub/service.rb', line 54

def mocked_publisher
  @mocked_publisher
end

#mocked_subscriberObject

Returns the value of attribute mocked_subscriber.



48
49
50
# File 'lib/gcloud/pubsub/service.rb', line 48

def mocked_subscriber
  @mocked_subscriber
end

#projectObject

Returns the value of attribute project.



28
29
30
# File 'lib/gcloud/pubsub/service.rb', line 28

def project
  @project
end

Instance Method Details

#acknowledge(subscription, *ack_ids) ⇒ Object

Acknowledges receipt of a message.



207
208
209
210
211
212
213
214
# File 'lib/gcloud/pubsub/service.rb', line 207

def acknowledge subscription, *ack_ids
  ack_req = Google::Pubsub::V1::AcknowledgeRequest.new(
    subscription: subscription_path(subscription),
    ack_ids: ack_ids
  )

  backoff { subscriber.acknowledge ack_req }
end

#create_subscription(topic, subscription_name, options = {}) ⇒ Object

Creates a subscription on a given topic for a given subscriber.



167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/gcloud/pubsub/service.rb', line 167

def create_subscription topic, subscription_name, options = {}
  sub_params = { name: subscription_path(subscription_name, options),
                 topic: topic_path(topic),
                 ack_deadline_seconds: options[:deadline]
               }.delete_if { |_, v| v.nil? }
  sub_req = Google::Pubsub::V1::Subscription.new sub_params
  if options[:endpoint]
    sub_req.push_config = Google::Pubsub::V1::PushConfig.new(
      push_endpoint: options[:endpoint],
      attributes: (options[:attributes] || {}).to_h)
  end

  backoff { subscriber.create_subscription sub_req }
end

#create_topic(topic_name, options = {}) ⇒ Object

Creates the given topic with the given name.



94
95
96
97
98
99
100
# File 'lib/gcloud/pubsub/service.rb', line 94

def create_topic topic_name, options = {}
  topic_req = Google::Pubsub::V1::Topic.new.tap do |r|
    r.name = topic_path(topic_name, options)
  end

  backoff { publisher.create_topic topic_req }
end

#credsObject



38
39
40
41
42
# File 'lib/gcloud/pubsub/service.rb', line 38

def creds
  return credentials if insecure?
  GRPC::Core::ChannelCredentials.new.compose \
    GRPC::Core::CallCredentials.new credentials.client.updater_proc
end

#delete_subscription(subscription) ⇒ Object

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.



185
186
187
188
189
190
191
# File 'lib/gcloud/pubsub/service.rb', line 185

def delete_subscription subscription
  del_req = Google::Pubsub::V1::DeleteSubscriptionRequest.new(
    subscription: subscription_path(subscription)
  )

  backoff { subscriber.delete_subscription del_req }
end

#delete_topic(topic_name) ⇒ Object

Deletes the topic with the given name. All subscriptions to this topic are also deleted. Raises GRPC status code 5 if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.



107
108
109
110
111
112
113
# File 'lib/gcloud/pubsub/service.rb', line 107

def delete_topic topic_name
  topic_req = Google::Pubsub::V1::DeleteTopicRequest.new.tap do |r|
    r.topic = topic_path(topic_name)
  end

  backoff { publisher.delete_topic topic_req }
end

#get_subscription(subscription_name, options = {}) ⇒ Object

Gets the details of a subscription.



134
135
136
137
138
139
140
# File 'lib/gcloud/pubsub/service.rb', line 134

def get_subscription subscription_name, options = {}
  sub_req = Google::Pubsub::V1::GetSubscriptionRequest.new(
    subscription: subscription_path(subscription_name, options)
  )

  backoff { subscriber.get_subscription sub_req }
end

#get_subscription_policy(subscription_name, options = {}) ⇒ Object



271
272
273
274
275
276
277
# File 'lib/gcloud/pubsub/service.rb', line 271

def get_subscription_policy subscription_name, options = {}
  get_req = Google::Iam::V1::GetIamPolicyRequest.new(
    resource: subscription_path(subscription_name, options)
  )

  backoff { iam.get_iam_policy get_req }
end

#get_topic(topic_name, options = {}) ⇒ Object

Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.



72
73
74
75
76
77
78
# File 'lib/gcloud/pubsub/service.rb', line 72

def get_topic topic_name, options = {}
  topic_req = Google::Pubsub::V1::GetTopicRequest.new.tap do |r|
    r.topic = topic_path(topic_name, options)
  end

  backoff { publisher.get_topic topic_req }
end

#get_topic_policy(topic_name, options = {}) ⇒ Object



245
246
247
248
249
250
251
# File 'lib/gcloud/pubsub/service.rb', line 245

def get_topic_policy topic_name, options = {}
  get_req = Google::Iam::V1::GetIamPolicyRequest.new(
    resource: topic_path(topic_name, options)
  )

  backoff { iam.get_iam_policy get_req }
end

#iamObject



56
57
58
59
# File 'lib/gcloud/pubsub/service.rb', line 56

def iam
  return mocked_iam if mocked_iam
  @iam ||= Google::Iam::V1::IAMPolicy::Stub.new host, creds
end

#insecure?Boolean



62
63
64
# File 'lib/gcloud/pubsub/service.rb', line 62

def insecure?
  credentials == :this_channel_is_insecure
end

#inspectObject



313
314
315
# File 'lib/gcloud/pubsub/service.rb', line 313

def inspect
  "#{self.class}(#{@project})"
end

#list_subscriptions(options = {}) ⇒ Object

Lists matching subscriptions by project.



156
157
158
159
160
161
162
163
# File 'lib/gcloud/pubsub/service.rb', line 156

def list_subscriptions options = {}
  list_params = { project:    project_path(options),
                  page_token: options[:token],
                  page_size:  options[:max] }.delete_if { |_, v| v.nil? }
  list_req = Google::Pubsub::V1::ListSubscriptionsRequest.new list_params

  backoff { subscriber.list_subscriptions list_req }
end

#list_topics(options = {}) ⇒ Object

Lists matching topics.



82
83
84
85
86
87
88
89
90
# File 'lib/gcloud/pubsub/service.rb', line 82

def list_topics options = {}
  topics_req = Google::Pubsub::V1::ListTopicsRequest.new.tap do |r|
    r.project = project_path(options)
    r.page_token = options[:token] if options[:token]
    r.page_size = options[:max] if options[:max]
  end

  backoff { publisher.list_topics topics_req }
end

#list_topics_subscriptions(topic, options = {}) ⇒ Object

Lists matching subscriptions by project and topic.



144
145
146
147
148
149
150
151
152
# File 'lib/gcloud/pubsub/service.rb', line 144

def list_topics_subscriptions topic, options = {}
  list_params = { topic:     topic_path(topic, options),
                  page_token: options[:token],
                  page_size:  options[:max] }.delete_if { |_, v| v.nil? }
  list_req = Google::Pubsub::V1::ListTopicSubscriptionsRequest.new \
    list_params

  backoff { publisher.list_topic_subscriptions list_req }
end

#modify_ack_deadline(subscription, ids, deadline) ⇒ Object

Modifies the ack deadline for a specific message.



235
236
237
238
239
240
241
242
243
# File 'lib/gcloud/pubsub/service.rb', line 235

def modify_ack_deadline subscription, ids, deadline
  mad_req = Google::Pubsub::V1::ModifyAckDeadlineRequest.new(
    subscription: subscription_path(subscription),
    ack_ids: Array(ids),
    ack_deadline_seconds: deadline
  )

  backoff { subscriber.modify_ack_deadline mad_req }
end

#modify_push_config(subscription, endpoint, attributes) ⇒ Object

Modifies the PushConfig for a specified subscription.



218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/gcloud/pubsub/service.rb', line 218

def modify_push_config subscription, endpoint, attributes
  # Convert attributes to strings to match the protobuf definition
  attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }]

  mpc_req = Google::Pubsub::V1::ModifyPushConfigRequest.new(
    subscription: subscription_path(subscription),
    push_config: Google::Pubsub::V1::PushConfig.new(
      push_endpoint: endpoint,
      attributes: attributes
    )
  )

  backoff { subscriber.modify_push_config mpc_req }
end

#project_path(options = {}) ⇒ Object



298
299
300
301
# File 'lib/gcloud/pubsub/service.rb', line 298

def project_path options = {}
  project_name = options[:project] || project
  "projects/#{project_name}"
end

#publish(topic, messages) ⇒ Object

Adds one or more messages to the topic. Raises GRPC status code 5 if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/gcloud/pubsub/service.rb', line 120

def publish topic, messages
  publish_req = Google::Pubsub::V1::PublishRequest.new(
    topic: topic_path(topic),
    messages: messages.map do |data, attributes|
      Google::Pubsub::V1::PubsubMessage.new(
        data: data, attributes: attributes)
    end
  )

  backoff { publisher.publish publish_req }
end

#publisherObject



50
51
52
53
# File 'lib/gcloud/pubsub/service.rb', line 50

def publisher
  return mocked_publisher if mocked_publisher
  @publisher ||= Google::Pubsub::V1::Publisher::Stub.new host, creds
end

#pull(subscription, options = {}) ⇒ Object

Pulls a single message from the server.



195
196
197
198
199
200
201
202
203
# File 'lib/gcloud/pubsub/service.rb', line 195

def pull subscription, options = {}
  pull_req = Google::Pubsub::V1::PullRequest.new(
    subscription: subscription_path(subscription, options),
    return_immediately: !(!options.fetch(:immediate, true)),
    max_messages: options.fetch(:max, 100).to_i
  )

  backoff { subscriber.pull pull_req }
end

#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object



279
280
281
282
283
284
285
286
# File 'lib/gcloud/pubsub/service.rb', line 279

def set_subscription_policy subscription_name, new_policy, options = {}
  set_req = Google::Iam::V1::SetIamPolicyRequest.new(
    resource: subscription_path(subscription_name, options),
    policy: Google::Iam::V1::Policy.decode_json(JSON.dump(new_policy))
  )

  backoff { iam.set_iam_policy set_req }
end

#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object



253
254
255
256
257
258
259
260
# File 'lib/gcloud/pubsub/service.rb', line 253

def set_topic_policy topic_name, new_policy, options = {}
  set_req = Google::Iam::V1::SetIamPolicyRequest.new(
    resource: topic_path(topic_name, options),
    policy: Google::Iam::V1::Policy.decode_json(JSON.dump(new_policy))
  )

  backoff { iam.set_iam_policy set_req }
end

#subscriberObject



44
45
46
47
# File 'lib/gcloud/pubsub/service.rb', line 44

def subscriber
  return mocked_subscriber if mocked_subscriber
  @subscriber ||= Google::Pubsub::V1::Subscriber::Stub.new host, creds
end

#subscription_path(subscription_name, options = {}) ⇒ Object



308
309
310
311
# File 'lib/gcloud/pubsub/service.rb', line 308

def subscription_path subscription_name, options = {}
  return subscription_name if subscription_name.to_s.include? "/"
  "#{project_path(options)}/subscriptions/#{subscription_name}"
end

#test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object



288
289
290
291
292
293
294
295
296
# File 'lib/gcloud/pubsub/service.rb', line 288

def test_subscription_permissions subscription_name,
                                  permissions, options = {}
  test_req = Google::Iam::V1::TestIamPermissionsRequest.new(
    resource: subscription_path(subscription_name, options),
    permissions: permissions
  )

  backoff { iam.test_iam_permissions test_req }
end

#test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object



262
263
264
265
266
267
268
269
# File 'lib/gcloud/pubsub/service.rb', line 262

def test_topic_permissions topic_name, permissions, options = {}
  test_req = Google::Iam::V1::TestIamPermissionsRequest.new(
    resource: topic_path(topic_name, options),
    permissions: permissions
  )

  backoff { iam.test_iam_permissions test_req }
end

#topic_path(topic_name, options = {}) ⇒ Object



303
304
305
306
# File 'lib/gcloud/pubsub/service.rb', line 303

def topic_path topic_name, options = {}
  return topic_name if topic_name.to_s.include? "/"
  "#{project_path(options)}/topics/#{topic_name}"
end