Class: Gcloud::Pubsub::Service

Inherits:
Object
  • Object
show all
Defined in:
lib/gcloud/pubsub/service.rb

Overview

methods.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(project, credentials) ⇒ Service

Creates a new Service instance.



32
33
34
35
36
# File 'lib/gcloud/pubsub/service.rb', line 32

def initialize project, credentials
  @project = project
  @credentials = credentials
  @host = "pubsub.googleapis.com"
end

Instance Attribute Details

#credentialsObject

Returns the value of attribute credentials.



28
29
30
# File 'lib/gcloud/pubsub/service.rb', line 28

def credentials
  @credentials
end

#hostObject

Returns the value of attribute host.



28
29
30
# File 'lib/gcloud/pubsub/service.rb', line 28

def host
  @host
end

#mocked_iamObject

Returns the value of attribute mocked_iam.



59
60
61
# File 'lib/gcloud/pubsub/service.rb', line 59

def mocked_iam
  @mocked_iam
end

#mocked_publisherObject

Returns the value of attribute mocked_publisher.



53
54
55
# File 'lib/gcloud/pubsub/service.rb', line 53

def mocked_publisher
  @mocked_publisher
end

#mocked_subscriberObject

Returns the value of attribute mocked_subscriber.



47
48
49
# File 'lib/gcloud/pubsub/service.rb', line 47

def mocked_subscriber
  @mocked_subscriber
end

#projectObject

Returns the value of attribute project.



28
29
30
# File 'lib/gcloud/pubsub/service.rb', line 28

def project
  @project
end

Instance Method Details

#acknowledge(subscription, *ack_ids) ⇒ Object

Acknowledges receipt of a message.



204
205
206
207
208
209
210
211
# File 'lib/gcloud/pubsub/service.rb', line 204

def acknowledge subscription, *ack_ids
  ack_req = Google::Pubsub::V1::AcknowledgeRequest.new(
    subscription: subscription_path(subscription),
    ack_ids: ack_ids
  )

  backoff { subscriber.acknowledge ack_req }
end

#create_subscription(topic, subscription_name, options = {}) ⇒ Object

Creates a subscription on a given topic for a given subscriber.



164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/gcloud/pubsub/service.rb', line 164

def create_subscription topic, subscription_name, options = {}
  sub_params = { name: subscription_path(subscription_name, options),
                 topic: topic_path(topic),
                 ack_deadline_seconds: options[:deadline]
               }.delete_if { |_, v| v.nil? }
  sub_req = Google::Pubsub::V1::Subscription.new sub_params
  if options[:endpoint]
    sub_req.push_config = Google::Pubsub::V1::PushConfig.new(
      push_endpoint: options[:endpoint],
      attributes: (options[:attributes] || {}).to_h)
  end

  backoff { subscriber.create_subscription sub_req }
end

#create_topic(topic_name, options = {}) ⇒ Object

Creates the given topic with the given name.



89
90
91
92
93
94
95
# File 'lib/gcloud/pubsub/service.rb', line 89

def create_topic topic_name, options = {}
  topic_req = Google::Pubsub::V1::Topic.new.tap do |r|
    r.name = topic_path(topic_name, options)
  end

  backoff { publisher.create_topic topic_req }
end

#credsObject



38
39
40
41
# File 'lib/gcloud/pubsub/service.rb', line 38

def creds
  GRPC::Core::ChannelCredentials.new.compose \
    GRPC::Core::CallCredentials.new credentials.client.updater_proc
end

#delete_subscription(subscription) ⇒ Object

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.



182
183
184
185
186
187
188
# File 'lib/gcloud/pubsub/service.rb', line 182

def delete_subscription subscription
  del_req = Google::Pubsub::V1::DeleteSubscriptionRequest.new(
    subscription: subscription_path(subscription)
  )

  backoff { subscriber.delete_subscription del_req }
end

#delete_topic(topic_name) ⇒ Object

Deletes the topic with the given name. All subscriptions to this topic are also deleted. Raises GRPC status code 5 if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.



102
103
104
105
106
107
108
# File 'lib/gcloud/pubsub/service.rb', line 102

def delete_topic topic_name
  topic_req = Google::Pubsub::V1::DeleteTopicRequest.new.tap do |r|
    r.topic = topic_path(topic_name)
  end

  backoff { publisher.delete_topic topic_req }
end

#get_subscription(subscription_name, options = {}) ⇒ Object

Gets the details of a subscription.



131
132
133
134
135
136
137
# File 'lib/gcloud/pubsub/service.rb', line 131

def get_subscription subscription_name, options = {}
  sub_req = Google::Pubsub::V1::GetSubscriptionRequest.new(
    subscription: subscription_path(subscription_name, options)
  )

  backoff { subscriber.get_subscription sub_req }
end

#get_subscription_policy(subscription_name, options = {}) ⇒ Object



268
269
270
271
272
273
274
# File 'lib/gcloud/pubsub/service.rb', line 268

def get_subscription_policy subscription_name, options = {}
  get_req = Google::Iam::V1::GetIamPolicyRequest.new(
    resource: subscription_path(subscription_name, options)
  )

  backoff { iam.get_iam_policy get_req }
end

#get_topic(topic_name, options = {}) ⇒ Object

Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.



67
68
69
70
71
72
73
# File 'lib/gcloud/pubsub/service.rb', line 67

def get_topic topic_name, options = {}
  topic_req = Google::Pubsub::V1::GetTopicRequest.new.tap do |r|
    r.topic = topic_path(topic_name, options)
  end

  backoff { publisher.get_topic topic_req }
end

#get_topic_policy(topic_name, options = {}) ⇒ Object



242
243
244
245
246
247
248
# File 'lib/gcloud/pubsub/service.rb', line 242

def get_topic_policy topic_name, options = {}
  get_req = Google::Iam::V1::GetIamPolicyRequest.new(
    resource: topic_path(topic_name, options)
  )

  backoff { iam.get_iam_policy get_req }
end

#iamObject



55
56
57
58
# File 'lib/gcloud/pubsub/service.rb', line 55

def iam
  return mocked_iam if mocked_iam
  @iam ||= Google::Iam::V1::IAMPolicy::Stub.new host, creds
end

#inspectObject



310
311
312
# File 'lib/gcloud/pubsub/service.rb', line 310

def inspect
  "#{self.class}(#{@project})"
end

#list_subscriptions(options = {}) ⇒ Object

Lists matching subscriptions by project.



153
154
155
156
157
158
159
160
# File 'lib/gcloud/pubsub/service.rb', line 153

def list_subscriptions options = {}
  list_params = { project:    project_path(options),
                  page_token: options[:token],
                  page_size:  options[:max] }.delete_if { |_, v| v.nil? }
  list_req = Google::Pubsub::V1::ListSubscriptionsRequest.new list_params

  backoff { subscriber.list_subscriptions list_req }
end

#list_topics(options = {}) ⇒ Object

Lists matching topics.



77
78
79
80
81
82
83
84
85
# File 'lib/gcloud/pubsub/service.rb', line 77

def list_topics options = {}
  topics_req = Google::Pubsub::V1::ListTopicsRequest.new.tap do |r|
    r.project = project_path(options)
    r.page_token = options[:token] if options[:token]
    r.page_size = options[:max] if options[:max]
  end

  backoff { publisher.list_topics topics_req }
end

#list_topics_subscriptions(topic, options = {}) ⇒ Object

Lists matching subscriptions by project and topic.



141
142
143
144
145
146
147
148
149
# File 'lib/gcloud/pubsub/service.rb', line 141

def list_topics_subscriptions topic, options = {}
  list_params = { topic:     topic_path(topic, options),
                  page_token: options[:token],
                  page_size:  options[:max] }.delete_if { |_, v| v.nil? }
  list_req = Google::Pubsub::V1::ListTopicSubscriptionsRequest.new \
    list_params

  backoff { publisher.list_topic_subscriptions list_req }
end

#modify_ack_deadline(subscription, ids, deadline) ⇒ Object

Modifies the ack deadline for a specific message.



232
233
234
235
236
237
238
239
240
# File 'lib/gcloud/pubsub/service.rb', line 232

def modify_ack_deadline subscription, ids, deadline
  mad_req = Google::Pubsub::V1::ModifyAckDeadlineRequest.new(
    subscription: subscription_path(subscription),
    ack_ids: Array(ids),
    ack_deadline_seconds: deadline
  )

  backoff { subscriber.modify_ack_deadline mad_req }
end

#modify_push_config(subscription, endpoint, attributes) ⇒ Object

Modifies the PushConfig for a specified subscription.



215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/gcloud/pubsub/service.rb', line 215

def modify_push_config subscription, endpoint, attributes
  # Convert attributes to strings to match the protobuf definition
  attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }]

  mpc_req = Google::Pubsub::V1::ModifyPushConfigRequest.new(
    subscription: subscription_path(subscription),
    push_config: Google::Pubsub::V1::PushConfig.new(
      push_endpoint: endpoint,
      attributes: attributes
    )
  )

  backoff { subscriber.modify_push_config mpc_req }
end

#project_path(options = {}) ⇒ Object



295
296
297
298
# File 'lib/gcloud/pubsub/service.rb', line 295

def project_path options = {}
  project_name = options[:project] || project
  "projects/#{project_name}"
end

#publish(topic, messages) ⇒ Object

Adds one or more messages to the topic. Raises GRPC status code 5 if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.



115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/gcloud/pubsub/service.rb', line 115

def publish topic, messages
  publish_req = Google::Pubsub::V1::PublishRequest.new(
    topic: topic_path(topic),
    messages: messages.map do |data, attributes|
      Google::Pubsub::V1::PubsubMessage.new(
        data: String(data).encode("ASCII-8BIT"),
        attributes: attributes
      )
    end
  )

  backoff { publisher.publish publish_req }
end

#publisherObject



49
50
51
52
# File 'lib/gcloud/pubsub/service.rb', line 49

def publisher
  return mocked_publisher if mocked_publisher
  @publisher ||= Google::Pubsub::V1::Publisher::Stub.new host, creds
end

#pull(subscription, options = {}) ⇒ Object

Pulls a single message from the server.



192
193
194
195
196
197
198
199
200
# File 'lib/gcloud/pubsub/service.rb', line 192

def pull subscription, options = {}
  pull_req = Google::Pubsub::V1::PullRequest.new(
    subscription: subscription_path(subscription, options),
    return_immediately: !(!options.fetch(:immediate, true)),
    max_messages: options.fetch(:max, 100).to_i
  )

  backoff { subscriber.pull pull_req }
end

#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object



276
277
278
279
280
281
282
283
# File 'lib/gcloud/pubsub/service.rb', line 276

def set_subscription_policy subscription_name, new_policy, options = {}
  set_req = Google::Iam::V1::SetIamPolicyRequest.new(
    resource: subscription_path(subscription_name, options),
    policy: Google::Iam::V1::Policy.decode_json(JSON.dump(new_policy))
  )

  backoff { iam.set_iam_policy set_req }
end

#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object



250
251
252
253
254
255
256
257
# File 'lib/gcloud/pubsub/service.rb', line 250

def set_topic_policy topic_name, new_policy, options = {}
  set_req = Google::Iam::V1::SetIamPolicyRequest.new(
    resource: topic_path(topic_name, options),
    policy: Google::Iam::V1::Policy.decode_json(JSON.dump(new_policy))
  )

  backoff { iam.set_iam_policy set_req }
end

#subscriberObject



43
44
45
46
# File 'lib/gcloud/pubsub/service.rb', line 43

def subscriber
  return mocked_subscriber if mocked_subscriber
  @subscriber ||= Google::Pubsub::V1::Subscriber::Stub.new host, creds
end

#subscription_path(subscription_name, options = {}) ⇒ Object



305
306
307
308
# File 'lib/gcloud/pubsub/service.rb', line 305

def subscription_path subscription_name, options = {}
  return subscription_name if subscription_name.to_s.include? "/"
  "#{project_path(options)}/subscriptions/#{subscription_name}"
end

#test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object



285
286
287
288
289
290
291
292
293
# File 'lib/gcloud/pubsub/service.rb', line 285

def test_subscription_permissions subscription_name,
                                  permissions, options = {}
  test_req = Google::Iam::V1::TestIamPermissionsRequest.new(
    resource: subscription_path(subscription_name, options),
    permissions: permissions
  )

  backoff { iam.test_iam_permissions test_req }
end

#test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object



259
260
261
262
263
264
265
266
# File 'lib/gcloud/pubsub/service.rb', line 259

def test_topic_permissions topic_name, permissions, options = {}
  test_req = Google::Iam::V1::TestIamPermissionsRequest.new(
    resource: topic_path(topic_name, options),
    permissions: permissions
  )

  backoff { iam.test_iam_permissions test_req }
end

#topic_path(topic_name, options = {}) ⇒ Object



300
301
302
303
# File 'lib/gcloud/pubsub/service.rb', line 300

def topic_path topic_name, options = {}
  return topic_name if topic_name.to_s.include? "/"
  "#{project_path(options)}/topics/#{topic_name}"
end