Module: Cloudenvoy::Backend::GooglePubSub

Defined in:
lib/cloudenvoy/backend/google_pub_sub.rb

Overview

Interface to GCP Pub/Sub and Pub/Sub local emulator

Class Method Summary collapse

Class Method Details

.backendGoogle::Cloud::Pub

Return the backend to use for sending messages.

Returns:

  • (Google::Cloud::Pub)

    The low level client instance.



34
35
36
37
38
39
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 34

def backend
  @backend ||= Google::Cloud::PubSub.new(**{
    project_id: config.gcp_project_id,
    emulator_host: development? ? Cloudenvoy::Config::EMULATOR_HOST : nil
  }.compact)
end

.configCloudenvoy::Config

Return the cloudenvoy configuration. See Cloudenvoy#configure.

Returns:



16
17
18
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 16

def config
  Cloudenvoy.config
end

.development?Boolean

Return true if the current config mode is development.

Returns:

  • (Boolean)

    True if Cloudenvoy is run in development mode.



25
26
27
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 25

def development?
  config.mode == :development
end

.publish(topic, payload, metadata = {}) ⇒ Cloudenvoy::Message

Publish a message to a topic.

Parameters:

  • topic (String)

    The name of the topic

  • payload (Hash, String)

    The message content.

  • metadata (Hash) (defaults to: {})

    The message attributes.

Returns:



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 59

def publish(topic, payload,  = {})
  # Retrieve the topic
  ps_topic = backend.topic(topic, skip_lookup: true)

  # Publish the message
  ps_msg = ps_topic.publish(payload.to_json, .to_h)

  # Return formatted message
  Message.new(
    id: ps_msg.message_id,
    payload: payload,
    metadata: ,
    topic: topic
  )
end

.publish_all(topic, msg_args) ⇒ Array<Cloudenvoy::Message>

Publish multiple messages to a topic.

Parameters:

  • topic (String)

    The name of the topic

  • msg_args (Array<Array<[Hash, String]>>)

    A list of message [payload, metadata].

Returns:



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 83

def publish_all(topic, msg_args)
  # Retrieve the topic
  ps_topic = backend.topic(topic, skip_lookup: true)

  # Publish the message
  ps_msgs = ps_topic.publish do |batch|
    msg_args.each do |(payload, )|
      batch.publish(payload.to_json, .to_h)
    end
  end

  # Return the formatted messages
  ps_msgs.each_with_index.map do |ps_msg, index|
    payload,  = msg_args[index]

    Message.new(
      id: ps_msg.message_id,
      payload: payload,
      metadata: ,
      topic: topic
    )
  end
end

.upsert_subscription(topic, name, opts = {}) ⇒ Cloudenvoy::Subscription

Create or update a subscription for a specific topic.

Parameters:

  • topic (String)

    The name of the topic

  • name (String)

    The name of the subscription

  • opts (Hash) (defaults to: {})

    The subscription configuration options

Options Hash (opts):

  • :deadline (Integer)

    The maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.

  • :retain_acked (Boolean)

    Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription’s backlog, even if they are acknowledged, until they fall out of the retention window. Default is false.

  • :retention (<Type>)

    How long to retain unacknowledged messages in the subscription’s backlog, from the moment a message is published. If retain_acked is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time a Subscription#seek can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes). Default is 604,800 seconds (7 days).

  • :filter (String)

    An expression written in the Cloud Pub/Sub filter language. If non-empty, then only Message instances whose attributes field matches the filter are delivered on this subscription. If empty, then no messages are filtered out. Optional.

Returns:



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 129

def upsert_subscription(topic, name, opts = {})
  sub_config = opts.to_h.merge(endpoint: webhook_url)

  # Auto-create topic in development. In non-development environments
  # the create subscription action raises an error if the topic does
  # not exist
  upsert_topic(topic) if development?

  # Create subscription
  ps_sub =
    begin
      # Retrieve the topic
      ps_topic = backend.topic(topic, skip_lookup: true)

      # Attempt to create the subscription
      ps_topic.subscribe(name, **sub_config)
    rescue Google::Cloud::AlreadyExistsError
      # Update endpoint on subscription
      # Topic is not updated as it is name-dependent
      backend.subscription(name).tap do |e|
        sub_config.each do |k, v|
          e.send("#{k}=", v)
        end
      end
    end

  # Return formatted subscription
  Subscription.new(name: ps_sub.name, original: ps_sub)
end

.upsert_topic(topic) ⇒ Cloudenvoy::Topic

Create or update a topic.

Parameters:

  • topic (String)

    The topic name.

Returns:



166
167
168
169
170
171
172
173
174
175
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 166

def upsert_topic(topic)
  ps_topic = begin
    backend.create_topic(topic)
  rescue Google::Cloud::AlreadyExistsError
    backend.topic(topic)
  end

  # Return formatted subscription
  Topic.new(name: ps_topic.name, original: ps_topic)
end

.webhook_urlString

Return an authenticated endpoint for processing Pub/Sub webhooks.

Returns:

  • (String)

    An authenticated endpoint.



46
47
48
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 46

def webhook_url
  "#{config.processor_url}?token=#{Authenticator.verification_token}"
end