Class: Gcloud::Pubsub::Service

Inherits:
Object
  • Object
show all
Defined in:
lib/gcloud/pubsub/service.rb

Overview

methods.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(project, credentials) ⇒ Service

Creates a new Service instance.



32
33
34
35
36
# File 'lib/gcloud/pubsub/service.rb', line 32

def initialize project, credentials
  @project = project
  @credentials = credentials
  @host = "pubsub.googleapis.com"
end

Instance Attribute Details

#credentialsObject

Returns the value of attribute credentials.



28
29
30
# File 'lib/gcloud/pubsub/service.rb', line 28

def credentials
  @credentials
end

#hostObject

Returns the value of attribute host.



28
29
30
# File 'lib/gcloud/pubsub/service.rb', line 28

def host
  @host
end

#mocked_iamObject

Returns the value of attribute mocked_iam.



60
61
62
# File 'lib/gcloud/pubsub/service.rb', line 60

def mocked_iam
  @mocked_iam
end

#mocked_publisherObject

Returns the value of attribute mocked_publisher.



54
55
56
# File 'lib/gcloud/pubsub/service.rb', line 54

def mocked_publisher
  @mocked_publisher
end

#mocked_subscriberObject

Returns the value of attribute mocked_subscriber.



48
49
50
# File 'lib/gcloud/pubsub/service.rb', line 48

def mocked_subscriber
  @mocked_subscriber
end

#projectObject

Returns the value of attribute project.



28
29
30
# File 'lib/gcloud/pubsub/service.rb', line 28

def project
  @project
end

Instance Method Details

#acknowledge(subscription, *ack_ids) ⇒ Object

Acknowledges receipt of a message.



209
210
211
212
213
214
215
216
# File 'lib/gcloud/pubsub/service.rb', line 209

def acknowledge subscription, *ack_ids
  ack_req = Google::Pubsub::V1::AcknowledgeRequest.new(
    subscription: subscription_path(subscription),
    ack_ids: ack_ids
  )

  backoff { subscriber.acknowledge ack_req }
end

#create_subscription(topic, subscription_name, options = {}) ⇒ Object

Creates a subscription on a given topic for a given subscriber.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/gcloud/pubsub/service.rb', line 169

def create_subscription topic, subscription_name, options = {}
  sub_params = { name: subscription_path(subscription_name, options),
                 topic: topic_path(topic),
                 ack_deadline_seconds: options[:deadline]
               }.delete_if { |_, v| v.nil? }
  sub_req = Google::Pubsub::V1::Subscription.new sub_params
  if options[:endpoint]
    sub_req.push_config = Google::Pubsub::V1::PushConfig.new(
      push_endpoint: options[:endpoint],
      attributes: (options[:attributes] || {}).to_h)
  end

  backoff { subscriber.create_subscription sub_req }
end

#create_topic(topic_name, options = {}) ⇒ Object

Creates the given topic with the given name.



94
95
96
97
98
99
100
# File 'lib/gcloud/pubsub/service.rb', line 94

def create_topic topic_name, options = {}
  topic_req = Google::Pubsub::V1::Topic.new.tap do |r|
    r.name = topic_path(topic_name, options)
  end

  backoff { publisher.create_topic topic_req }
end

#credsObject



38
39
40
41
42
# File 'lib/gcloud/pubsub/service.rb', line 38

def creds
  return credentials if insecure?
  GRPC::Core::ChannelCredentials.new.compose \
    GRPC::Core::CallCredentials.new credentials.client.updater_proc
end

#delete_subscription(subscription) ⇒ Object

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.



187
188
189
190
191
192
193
# File 'lib/gcloud/pubsub/service.rb', line 187

def delete_subscription subscription
  del_req = Google::Pubsub::V1::DeleteSubscriptionRequest.new(
    subscription: subscription_path(subscription)
  )

  backoff { subscriber.delete_subscription del_req }
end

#delete_topic(topic_name) ⇒ Object

Deletes the topic with the given name. All subscriptions to this topic are also deleted. Raises GRPC status code 5 if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.



107
108
109
110
111
112
113
# File 'lib/gcloud/pubsub/service.rb', line 107

def delete_topic topic_name
  topic_req = Google::Pubsub::V1::DeleteTopicRequest.new.tap do |r|
    r.topic = topic_path(topic_name)
  end

  backoff { publisher.delete_topic topic_req }
end

#get_subscription(subscription_name, options = {}) ⇒ Object

Gets the details of a subscription.



136
137
138
139
140
141
142
# File 'lib/gcloud/pubsub/service.rb', line 136

def get_subscription subscription_name, options = {}
  sub_req = Google::Pubsub::V1::GetSubscriptionRequest.new(
    subscription: subscription_path(subscription_name, options)
  )

  backoff { subscriber.get_subscription sub_req }
end

#get_subscription_policy(subscription_name, options = {}) ⇒ Object



273
274
275
276
277
278
279
# File 'lib/gcloud/pubsub/service.rb', line 273

def get_subscription_policy subscription_name, options = {}
  get_req = Google::Iam::V1::GetIamPolicyRequest.new(
    resource: subscription_path(subscription_name, options)
  )

  backoff { iam.get_iam_policy get_req }
end

#get_topic(topic_name, options = {}) ⇒ Object

Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.



72
73
74
75
76
77
78
# File 'lib/gcloud/pubsub/service.rb', line 72

def get_topic topic_name, options = {}
  topic_req = Google::Pubsub::V1::GetTopicRequest.new.tap do |r|
    r.topic = topic_path(topic_name, options)
  end

  backoff { publisher.get_topic topic_req }
end

#get_topic_policy(topic_name, options = {}) ⇒ Object



247
248
249
250
251
252
253
# File 'lib/gcloud/pubsub/service.rb', line 247

def get_topic_policy topic_name, options = {}
  get_req = Google::Iam::V1::GetIamPolicyRequest.new(
    resource: topic_path(topic_name, options)
  )

  backoff { iam.get_iam_policy get_req }
end

#iamObject



56
57
58
59
# File 'lib/gcloud/pubsub/service.rb', line 56

def iam
  return mocked_iam if mocked_iam
  @iam ||= Google::Iam::V1::IAMPolicy::Stub.new host, creds
end

#insecure?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/gcloud/pubsub/service.rb', line 62

def insecure?
  credentials == :this_channel_is_insecure
end

#inspectObject



315
316
317
# File 'lib/gcloud/pubsub/service.rb', line 315

def inspect
  "#{self.class}(#{@project})"
end

#list_subscriptions(options = {}) ⇒ Object

Lists matching subscriptions by project.



158
159
160
161
162
163
164
165
# File 'lib/gcloud/pubsub/service.rb', line 158

def list_subscriptions options = {}
  list_params = { project:    project_path(options),
                  page_token: options[:token],
                  page_size:  options[:max] }.delete_if { |_, v| v.nil? }
  list_req = Google::Pubsub::V1::ListSubscriptionsRequest.new list_params

  backoff { subscriber.list_subscriptions list_req }
end

#list_topics(options = {}) ⇒ Object

Lists matching topics.



82
83
84
85
86
87
88
89
90
# File 'lib/gcloud/pubsub/service.rb', line 82

def list_topics options = {}
  topics_req = Google::Pubsub::V1::ListTopicsRequest.new.tap do |r|
    r.project = project_path(options)
    r.page_token = options[:token] if options[:token]
    r.page_size = options[:max] if options[:max]
  end

  backoff { publisher.list_topics topics_req }
end

#list_topics_subscriptions(topic, options = {}) ⇒ Object

Lists matching subscriptions by project and topic.



146
147
148
149
150
151
152
153
154
# File 'lib/gcloud/pubsub/service.rb', line 146

def list_topics_subscriptions topic, options = {}
  list_params = { topic:     topic_path(topic, options),
                  page_token: options[:token],
                  page_size:  options[:max] }.delete_if { |_, v| v.nil? }
  list_req = Google::Pubsub::V1::ListTopicSubscriptionsRequest.new \
    list_params

  backoff { publisher.list_topic_subscriptions list_req }
end

#modify_ack_deadline(subscription, ids, deadline) ⇒ Object

Modifies the ack deadline for a specific message.



237
238
239
240
241
242
243
244
245
# File 'lib/gcloud/pubsub/service.rb', line 237

def modify_ack_deadline subscription, ids, deadline
  mad_req = Google::Pubsub::V1::ModifyAckDeadlineRequest.new(
    subscription: subscription_path(subscription),
    ack_ids: Array(ids),
    ack_deadline_seconds: deadline
  )

  backoff { subscriber.modify_ack_deadline mad_req }
end

#modify_push_config(subscription, endpoint, attributes) ⇒ Object

Modifies the PushConfig for a specified subscription.



220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/gcloud/pubsub/service.rb', line 220

def modify_push_config subscription, endpoint, attributes
  # Convert attributes to strings to match the protobuf definition
  attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }]

  mpc_req = Google::Pubsub::V1::ModifyPushConfigRequest.new(
    subscription: subscription_path(subscription),
    push_config: Google::Pubsub::V1::PushConfig.new(
      push_endpoint: endpoint,
      attributes: attributes
    )
  )

  backoff { subscriber.modify_push_config mpc_req }
end

#project_path(options = {}) ⇒ Object



300
301
302
303
# File 'lib/gcloud/pubsub/service.rb', line 300

def project_path options = {}
  project_name = options[:project] || project
  "projects/#{project_name}"
end

#publish(topic, messages) ⇒ Object

Adds one or more messages to the topic. Raises GRPC status code 5 if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/gcloud/pubsub/service.rb', line 120

def publish topic, messages
  publish_req = Google::Pubsub::V1::PublishRequest.new(
    topic: topic_path(topic),
    messages: messages.map do |data, attributes|
      Google::Pubsub::V1::PubsubMessage.new(
        data: String(data).encode("ASCII-8BIT"),
        attributes: attributes
      )
    end
  )

  backoff { publisher.publish publish_req }
end

#publisherObject



50
51
52
53
# File 'lib/gcloud/pubsub/service.rb', line 50

def publisher
  return mocked_publisher if mocked_publisher
  @publisher ||= Google::Pubsub::V1::Publisher::Stub.new host, creds
end

#pull(subscription, options = {}) ⇒ Object

Pulls a single message from the server.



197
198
199
200
201
202
203
204
205
# File 'lib/gcloud/pubsub/service.rb', line 197

def pull subscription, options = {}
  pull_req = Google::Pubsub::V1::PullRequest.new(
    subscription: subscription_path(subscription, options),
    return_immediately: !(!options.fetch(:immediate, true)),
    max_messages: options.fetch(:max, 100).to_i
  )

  backoff { subscriber.pull pull_req }
end

#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object



281
282
283
284
285
286
287
288
# File 'lib/gcloud/pubsub/service.rb', line 281

def set_subscription_policy subscription_name, new_policy, options = {}
  set_req = Google::Iam::V1::SetIamPolicyRequest.new(
    resource: subscription_path(subscription_name, options),
    policy: Google::Iam::V1::Policy.decode_json(JSON.dump(new_policy))
  )

  backoff { iam.set_iam_policy set_req }
end

#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object



255
256
257
258
259
260
261
262
# File 'lib/gcloud/pubsub/service.rb', line 255

def set_topic_policy topic_name, new_policy, options = {}
  set_req = Google::Iam::V1::SetIamPolicyRequest.new(
    resource: topic_path(topic_name, options),
    policy: Google::Iam::V1::Policy.decode_json(JSON.dump(new_policy))
  )

  backoff { iam.set_iam_policy set_req }
end

#subscriberObject



44
45
46
47
# File 'lib/gcloud/pubsub/service.rb', line 44

def subscriber
  return mocked_subscriber if mocked_subscriber
  @subscriber ||= Google::Pubsub::V1::Subscriber::Stub.new host, creds
end

#subscription_path(subscription_name, options = {}) ⇒ Object



310
311
312
313
# File 'lib/gcloud/pubsub/service.rb', line 310

def subscription_path subscription_name, options = {}
  return subscription_name if subscription_name.to_s.include? "/"
  "#{project_path(options)}/subscriptions/#{subscription_name}"
end

#test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object



290
291
292
293
294
295
296
297
298
# File 'lib/gcloud/pubsub/service.rb', line 290

def test_subscription_permissions subscription_name,
                                  permissions, options = {}
  test_req = Google::Iam::V1::TestIamPermissionsRequest.new(
    resource: subscription_path(subscription_name, options),
    permissions: permissions
  )

  backoff { iam.test_iam_permissions test_req }
end

#test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object



264
265
266
267
268
269
270
271
# File 'lib/gcloud/pubsub/service.rb', line 264

def test_topic_permissions topic_name, permissions, options = {}
  test_req = Google::Iam::V1::TestIamPermissionsRequest.new(
    resource: topic_path(topic_name, options),
    permissions: permissions
  )

  backoff { iam.test_iam_permissions test_req }
end

#topic_path(topic_name, options = {}) ⇒ Object



305
306
307
308
# File 'lib/gcloud/pubsub/service.rb', line 305

def topic_path topic_name, options = {}
  return topic_name if topic_name.to_s.include? "/"
  "#{project_path(options)}/topics/#{topic_name}"
end