Class: Gcloud::Pubsub::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/gcloud/pubsub/connection.rb

Overview

Represents the connection to Pub/Sub, as well as expose the API calls.

Constant Summary collapse

API_VERSION =

:nodoc:

"v1"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(project, credentials) ⇒ Connection

Creates a new Connection instance.



32
33
34
35
36
37
38
39
# File 'lib/gcloud/pubsub/connection.rb', line 32

def initialize project, credentials
  @project = project
  @credentials = credentials
  @client = Google::APIClient.new application_name:    "gcloud-ruby",
                                  application_version: Gcloud::VERSION
  @client.authorization = @credentials.client
  @pubsub = @client.discovered_api "pubsub", API_VERSION
end

Instance Attribute Details

#credentialsObject

:nodoc:



28
29
30
# File 'lib/gcloud/pubsub/connection.rb', line 28

def credentials
  @credentials
end

#projectObject

Returns the value of attribute project.



27
28
29
# File 'lib/gcloud/pubsub/connection.rb', line 27

def project
  @project
end

Instance Method Details

#acknowledge(subscription, *ack_ids) ⇒ Object

Acknowledges receipt of a message.



211
212
213
214
215
216
217
# File 'lib/gcloud/pubsub/connection.rb', line 211

def acknowledge subscription, *ack_ids
  @client.execute(
    api_method:  @pubsub.projects.subscriptions.acknowledge,
    parameters:  { subscription: subscription },
    body_object: { ackIds: ack_ids }
  )
end

#create_subscription(topic, subscription_name, options = {}) ⇒ Object

Creates a subscription on a given topic for a given subscriber.



106
107
108
109
110
111
112
113
# File 'lib/gcloud/pubsub/connection.rb', line 106

def create_subscription topic, subscription_name, options = {}
  data = subscription_data topic, options
  @client.execute(
    api_method: @pubsub.projects.subscriptions.create,
    parameters: { name: subscription_path(subscription_name, options) },
    body_object: data
  )
end

#create_topic(topic_name, options = {}) ⇒ Object

Creates the given topic with the given name.



56
57
58
59
60
61
# File 'lib/gcloud/pubsub/connection.rb', line 56

def create_topic topic_name, options = {}
  @client.execute(
    api_method: @pubsub.projects.topics.create,
    parameters: { name: topic_path(topic_name, options) }
  )
end

#delete_subscription(subscription) ⇒ Object

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.



156
157
158
159
160
161
# File 'lib/gcloud/pubsub/connection.rb', line 156

def delete_subscription subscription
  @client.execute(
    api_method: @pubsub.projects.subscriptions.delete,
    parameters: { subscription: subscription }
  )
end

#delete_topic(topic) ⇒ Object

Deletes the topic with the given name. All subscriptions to this topic are also deleted. Returns NOT_FOUND if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.



82
83
84
85
86
87
# File 'lib/gcloud/pubsub/connection.rb', line 82

def delete_topic topic
  @client.execute(
    api_method: @pubsub.projects.topics.delete,
    parameters: { topic: topic }
  )
end

#get_subscription(subscription_name, options = {}) ⇒ Object

Gets the details of a subscription.



117
118
119
120
121
122
123
# File 'lib/gcloud/pubsub/connection.rb', line 117

def get_subscription subscription_name, options = {}
  @client.execute(
    api_method: @pubsub.projects.subscriptions.get,
    parameters: {
      subscription: subscription_path(subscription_name, options) }
  )
end

#get_subscription_policy(subscription_name, options = {}) ⇒ Object



163
164
165
166
167
168
169
# File 'lib/gcloud/pubsub/connection.rb', line 163

def get_subscription_policy subscription_name, options = {}
  @client.execute(
    api_method: @pubsub.projects.subscriptions.get_iam_policy,
    parameters: {
      resource: subscription_path(subscription_name, options) }
  )
end

#get_topic(topic_name, options = {}) ⇒ Object

Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.



47
48
49
50
51
52
# File 'lib/gcloud/pubsub/connection.rb', line 47

def get_topic topic_name, options = {}
  @client.execute(
    api_method: @pubsub.projects.topics.get,
    parameters: { topic: topic_path(topic_name, options) }
  )
end

#get_topic_policy(topic_name, options = {}) ⇒ Object



89
90
91
92
93
94
# File 'lib/gcloud/pubsub/connection.rb', line 89

def get_topic_policy topic_name, options = {}
  @client.execute(
    api_method: @pubsub.projects.topics.get_iam_policy,
    parameters: { resource: topic_path(topic_name, options) }
  )
end

#list_subscriptions(options = {}) ⇒ Object

Lists matching subscriptions by project.



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/gcloud/pubsub/connection.rb', line 127

def list_subscriptions options = {}
  params = { project: project_path(options),
             pageToken: options.delete(:token),
             pageSize: options.delete(:max)
           }.delete_if { |_, v| v.nil? }

  @client.execute(
    api_method: @pubsub.projects.subscriptions.list,
    parameters: params
  )
end

#list_topics(options = {}) ⇒ Object

Lists matching topics.



65
66
67
68
69
70
71
72
73
74
75
# File 'lib/gcloud/pubsub/connection.rb', line 65

def list_topics options = {}
  params = { project: project_path(options),
             pageToken: options.delete(:token),
             pageSize: options.delete(:max)
           }.delete_if { |_, v| v.nil? }

  @client.execute(
    api_method: @pubsub.projects.topics.list,
    parameters: params
  )
end

#list_topics_subscriptions(topic, options = {}) ⇒ Object

Lists matching subscriptions by project and topic.



141
142
143
144
145
146
147
148
149
150
151
# File 'lib/gcloud/pubsub/connection.rb', line 141

def list_topics_subscriptions topic, options = {}
  params = { topic: topic,
             pageToken: options.delete(:token),
             pageSize: options.delete(:max)
           }.delete_if { |_, v| v.nil? }

  @client.execute(
    api_method: @pubsub.projects.topics.subscriptions.list,
    parameters: params
  )
end

#modify_ack_deadline(subscription, ids, deadline) ⇒ Object

Modifies the ack deadline for a specific message.



232
233
234
235
236
237
238
239
# File 'lib/gcloud/pubsub/connection.rb', line 232

def modify_ack_deadline subscription, ids, deadline
  ids = Array ids
  @client.execute(
    api_method:  @pubsub.projects.subscriptions.modify_ack_deadline,
    parameters:  { subscription: subscription },
    body_object: { ackIds: ids, ackDeadlineSeconds: deadline }
  )
end

#modify_push_config(subscription, endpoint, attributes) ⇒ Object

Modifies the PushConfig for a specified subscription.



221
222
223
224
225
226
227
228
# File 'lib/gcloud/pubsub/connection.rb', line 221

def modify_push_config subscription, endpoint, attributes
  @client.execute(
    api_method:  @pubsub.projects.subscriptions.modify_push_config,
    parameters:  { subscription: subscription },
    body_object: { pushConfig: { pushEndpoint: endpoint,
                                 attributes: attributes } }
  )
end

#project_path(options = {}) ⇒ Object



241
242
243
244
# File 'lib/gcloud/pubsub/connection.rb', line 241

def project_path options = {}
  project_name = options[:project] || project
  "projects/#{project_name}"
end

#publish(topic, messages) ⇒ Object

Adds one or more messages to the topic. Returns NOT_FOUND if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.



185
186
187
188
189
190
191
192
193
194
# File 'lib/gcloud/pubsub/connection.rb', line 185

def publish topic, messages
  gapi_msgs = messages.map do |data, attributes|
    { data: [data].pack("m"), attributes: attributes }
  end
  @client.execute(
    api_method:  @pubsub.projects.topics.publish,
    parameters:  { topic: topic },
    body_object: { messages: gapi_msgs }
  )
end

#pull(subscription, options = {}) ⇒ Object

Pulls a single message from the server.



198
199
200
201
202
203
204
205
206
207
# File 'lib/gcloud/pubsub/connection.rb', line 198

def pull subscription, options = {}
  body = { returnImmediately: !(!options.fetch(:immediate, true)),
           maxMessages:          options.fetch(:max, 100).to_i }

  @client.execute(
    api_method:  @pubsub.projects.subscriptions.pull,
    parameters:  { subscription: subscription },
    body_object: body
  )
end

#set_subscription_policy(subscription_name, new_policy, options = {}) ⇒ Object



171
172
173
174
175
176
177
178
# File 'lib/gcloud/pubsub/connection.rb', line 171

def set_subscription_policy subscription_name, new_policy, options = {}
  @client.execute(
    api_method: @pubsub.projects.subscriptions.set_iam_policy,
    parameters: {
      resource: subscription_path(subscription_name, options) },
    body_object: { policy: new_policy }
  )
end

#set_topic_policy(topic_name, new_policy, options = {}) ⇒ Object



96
97
98
99
100
101
102
# File 'lib/gcloud/pubsub/connection.rb', line 96

def set_topic_policy topic_name, new_policy, options = {}
  @client.execute(
    api_method: @pubsub.projects.topics.set_iam_policy,
    parameters: { resource: topic_path(topic_name, options) },
    body_object: { policy: new_policy }
  )
end

#subscription_path(subscription_name, options = {}) ⇒ Object



251
252
253
254
# File 'lib/gcloud/pubsub/connection.rb', line 251

def subscription_path subscription_name, options = {}
  return subscription_name if subscription_name.to_s.include? "/"
  "#{project_path(options)}/subscriptions/#{subscription_name}"
end

#topic_path(topic_name, options = {}) ⇒ Object



246
247
248
249
# File 'lib/gcloud/pubsub/connection.rb', line 246

def topic_path topic_name, options = {}
  return topic_name if topic_name.to_s.include? "/"
  "#{project_path(options)}/topics/#{topic_name}"
end