Class: Gcloud::Pubsub::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/gcloud/pubsub/connection.rb

Overview

Represents the connection to Pub/Sub, as well as expose the API calls.

Constant Summary collapse

API_VERSION =

:nodoc:

"v1"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(project, credentials) ⇒ Connection

Creates a new Connection instance.



32
33
34
35
36
37
38
39
# File 'lib/gcloud/pubsub/connection.rb', line 32

def initialize project, credentials
  @project = project
  @credentials = credentials
  @client = Google::APIClient.new application_name:    "gcloud-ruby",
                                  application_version: Gcloud::VERSION
  @client.authorization = @credentials.client
  @pubsub = @client.discovered_api "pubsub", API_VERSION
end

Instance Attribute Details

#credentialsObject

:nodoc:



28
29
30
# File 'lib/gcloud/pubsub/connection.rb', line 28

def credentials
  @credentials
end

#projectObject

Returns the value of attribute project.



27
28
29
# File 'lib/gcloud/pubsub/connection.rb', line 27

def project
  @project
end

Instance Method Details

#acknowledge(subscription, *ack_ids) ⇒ Object

Acknowledges receipt of a message.



229
230
231
232
233
234
235
# File 'lib/gcloud/pubsub/connection.rb', line 229

def acknowledge subscription, *ack_ids
  @client.execute(
    api_method:  @pubsub.projects.subscriptions.acknowledge,
    parameters:  { subscription: subscription },
    body_object: { ackIds: ack_ids }
  )
end

#create_subscription(topic, subscription_name, options = {}) ⇒ Object

Creates a subscription on a given topic for a given subscriber.



114
115
116
117
118
119
120
121
# File 'lib/gcloud/pubsub/connection.rb', line 114

def create_subscription topic, subscription_name, options = {}
  data = subscription_data topic, options
  @client.execute(
    api_method: @pubsub.projects.subscriptions.create,
    parameters: { name: subscription_path(subscription_name, options) },
    body_object: data
  )
end

#create_topic(topic_name, options = {}) ⇒ Object

Creates the given topic with the given name.



56
57
58
59
60
61
# File 'lib/gcloud/pubsub/connection.rb', line 56

def create_topic topic_name, options = {}
  @client.execute(
    api_method: @pubsub.projects.topics.create,
    parameters: { name: topic_path(topic_name, options) }
  )
end

#delete_subscription(subscription) ⇒ Object

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.



164
165
166
167
168
169
# File 'lib/gcloud/pubsub/connection.rb', line 164

def delete_subscription subscription
  @client.execute(
    api_method: @pubsub.projects.subscriptions.delete,
    parameters: { subscription: subscription }
  )
end

#delete_topic(topic) ⇒ Object

Deletes the topic with the given name. All subscriptions to this topic are also deleted. Returns NOT_FOUND if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.



82
83
84
85
86
87
# File 'lib/gcloud/pubsub/connection.rb', line 82

def delete_topic topic
  @client.execute(
    api_method: @pubsub.projects.topics.delete,
    parameters: { topic: topic }
  )
end

#get_subscription(subscription_name, options = {}) ⇒ Object

Gets the details of a subscription.



125
126
127
128
129
130
131
# File 'lib/gcloud/pubsub/connection.rb', line 125

def get_subscription subscription_name, options = {}
  @client.execute(
    api_method: @pubsub.projects.subscriptions.get,
    parameters: {
      subscription: subscription_path(subscription_name, options) }
  )
end

#get_subscription_policy(subscription_name, options = {}) ⇒ Object



171
172
173
174
175
176
177
# File 'lib/gcloud/pubsub/connection.rb', line 171

def get_subscription_policy subscription_name, options = {}
  @client.execute(
    api_method: @pubsub.projects.subscriptions.get_iam_policy,
    parameters: {
      resource: subscription_path(subscription_name, options) }
  )
end

#get_topic(topic_name, options = {}) ⇒ Object

Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.



47
48
49
50
51
52
# File 'lib/gcloud/pubsub/connection.rb', line 47

def get_topic topic_name, options = {}
  @client.execute(
    api_method: @pubsub.projects.topics.get,
    parameters: { topic: topic_path(topic_name, options) }
  )
end

#get_topic_policy(topic_name, options = {}) ⇒ Object



89
90
91
92
93
94
# File 'lib/gcloud/pubsub/connection.rb', line 89

def get_topic_policy topic_name, options = {}
  @client.execute(
    api_method: @pubsub.projects.topics.get_iam_policy,
    parameters: { resource: topic_path(topic_name, options) }
  )
end

#inspectObject

:nodoc:



274
275
276
# File 'lib/gcloud/pubsub/connection.rb', line 274

def inspect #:nodoc:
  "#{self.class}(#{@project})"
end

#list_subscriptions(options = {}) ⇒ Object

Lists matching subscriptions by project.



135
136
137
138
139
140
141
142
143
144
145
# File 'lib/gcloud/pubsub/connection.rb', line 135

def list_subscriptions options = {}
  params = { project: project_path(options),
             pageToken: options.delete(:token),
             pageSize: options.delete(:max)
           }.delete_if { |_, v| v.nil? }

  @client.execute(
    api_method: @pubsub.projects.subscriptions.list,
    parameters: params
  )
end

#list_topics(options = {}) ⇒ Object

Lists matching topics.



65
66
67
68
69
70
71
72
73
74
75
# File 'lib/gcloud/pubsub/connection.rb', line 65

def list_topics options = {}
  params = { project: project_path(options),
             pageToken: options.delete(:token),
             pageSize: options.delete(:max)
           }.delete_if { |_, v| v.nil? }

  @client.execute(
    api_method: @pubsub.projects.topics.list,
    parameters: params
  )
end

#list_topics_subscriptions(topic, options = {}) ⇒ Object

Lists matching subscriptions by project and topic.



149
150
151
152
153
154
155
156
157
158
159
# File 'lib/gcloud/pubsub/connection.rb', line 149

def list_topics_subscriptions topic, options = {}
  params = { topic: topic,
             pageToken: options.delete(:token),
             pageSize: options.delete(:max)
           }.delete_if { |_, v| v.nil? }

  @client.execute(
    api_method: @pubsub.projects.topics.subscriptions.list,
    parameters: params
  )
end

#modify_ack_deadline(subscription, ids, deadline) ⇒ Object

Modifies the ack deadline for a specific message.



250
251
252
253
254
255
256
257
# File 'lib/gcloud/pubsub/connection.rb', line 250

def modify_ack_deadline subscription, ids, deadline
  ids = Array ids
  @client.execute(
    api_method:  @pubsub.projects.subscriptions.modify_ack_deadline,
    parameters:  { subscription: subscription },
    body_object: { ackIds: ids, ackDeadlineSeconds: deadline }
  )
end

#modify_push_config(subscription, endpoint, attributes) ⇒ Object

Modifies the PushConfig for a specified subscription.



239
240
241
242
243
244
245
246
# File 'lib/gcloud/pubsub/connection.rb', line 239

def modify_push_config subscription, endpoint, attributes
  @client.execute(
    api_method:  @pubsub.projects.subscriptions.modify_push_config,
    parameters:  { subscription: subscription },
    body_object: { pushConfig: { pushEndpoint: endpoint,
                                 attributes: attributes } }
  )
end

#project_path(options = {}) ⇒ Object



259
260
261
262
# File 'lib/gcloud/pubsub/connection.rb', line 259

def project_path options = {}
  project_name = options[:project] || project
  "projects/#{project_name}"
end

#publish(topic, messages) ⇒ Object

Adds one or more messages to the topic. Returns NOT_FOUND if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.



203
204
205
206
207
208
209
210
211
212
# File 'lib/gcloud/pubsub/connection.rb', line 203

def publish topic, messages
  gapi_msgs = messages.map do |data, attributes|
    { data: [data].pack("m"), attributes: attributes }
  end
  @client.execute(
    api_method:  @pubsub.projects.topics.publish,
    parameters:  { topic: topic_path(topic) },
    body_object: { messages: gapi_msgs }
  )
end

#pull(subscription, options = {}) ⇒ Object

Pulls a single message from the server.



216
217
218
219
220
221
222
223
224
225
# File 'lib/gcloud/pubsub/connection.rb', line 216

def pull subscription, options = {}
  body = { returnImmediately: !(!options.fetch(:immediate, true)),
           maxMessages:          options.fetch(:max, 100).to_i }

  @client.execute(
    api_method:  @pubsub.projects.subscriptions.pull,
    parameters:  { subscription: subscription },
    body_object: body
  )
end

#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object



179
180
181
182
183
184
185
186
# File 'lib/gcloud/pubsub/connection.rb', line 179

def set_subscription_policy subscription_name, new_policy, options = {}
  @client.execute(
    api_method: @pubsub.projects.subscriptions.set_iam_policy,
    parameters: {
      resource: subscription_path(subscription_name, options) },
    body_object: { policy: new_policy }
  )
end

#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object



96
97
98
99
100
101
102
# File 'lib/gcloud/pubsub/connection.rb', line 96

def set_topic_policy topic_name, new_policy, options = {}
  @client.execute(
    api_method: @pubsub.projects.topics.set_iam_policy,
    parameters: { resource: topic_path(topic_name, options) },
    body_object: { policy: new_policy }
  )
end

#subscription_path(subscription_name, options = {}) ⇒ Object



269
270
271
272
# File 'lib/gcloud/pubsub/connection.rb', line 269

def subscription_path subscription_name, options = {}
  return subscription_name if subscription_name.to_s.include? "/"
  "#{project_path(options)}/subscriptions/#{subscription_name}"
end

#test_subscription_permissions(subscription_name, permissions, options = {}) ⇒ Object



188
189
190
191
192
193
194
195
196
# File 'lib/gcloud/pubsub/connection.rb', line 188

def test_subscription_permissions subscription_name,
                                  permissions, options = {}
  @client.execute(
    api_method: @pubsub.projects.subscriptions.test_iam_permissions,
    parameters: {
      resource: subscription_path(subscription_name, options) },
    body_object: { permissions: permissions }
  )
end

#test_topic_permissions(topic_name, permissions, options = {}) ⇒ Object



104
105
106
107
108
109
110
# File 'lib/gcloud/pubsub/connection.rb', line 104

def test_topic_permissions topic_name, permissions, options = {}
  @client.execute(
    api_method: @pubsub.projects.topics.test_iam_permissions,
    parameters: { resource: topic_path(topic_name, options) },
    body_object: { permissions: permissions }
  )
end

#topic_path(topic_name, options = {}) ⇒ Object



264
265
266
267
# File 'lib/gcloud/pubsub/connection.rb', line 264

def topic_path topic_name, options = {}
  return topic_name if topic_name.to_s.include? "/"
  "#{project_path(options)}/topics/#{topic_name}"
end