Module: GoogleSubscriber::SubscriptionStarter

Included in:
BaseSubscriber
Defined in:
lib/google_subscriber/subscription_starter.rb

Constant Summary collapse

DEFAULT_SUBSCRIPTION_LISTEN_ARGS =
{}.freeze

Instance Method Summary collapse

Instance Method Details

#build_listen_argsObject



34
35
36
37
38
# File 'lib/google_subscriber/subscription_starter.rb', line 34

def build_listen_args
  DEFAULT_SUBSCRIPTION_LISTEN_ARGS
    .merge(GoogleSubscriber.subscription_listen_args || {})
    .merge(g_subscription_listen_args || {})
end

#create_subscriptionObject

Raises:

  • (ArgumentError)


40
41
42
43
44
45
# File 'lib/google_subscriber/subscription_starter.rb', line 40

def create_subscription
  raise ArgumentError, 'subscription_id is required!' if g_subscription_id.nil?

  GoogleSubscriber::PubSubFactory.new_pub_sub(project_id: g_project_id, credentials: g_credentials)
                                 .subscription(g_subscription_id&.to_s)
end

#on_received_wrapperObject



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/google_subscriber/subscription_starter.rb', line 21

def on_received_wrapper
  if defined?(Rails)
    # https://guides.rubyonrails.org/threading_and_code_execution.html#reloader
    Rails.application.reloader.wrap do
      ActiveRecord::Base.connection_pool.with_connection do
        yield
      end
    end
  else
    yield
  end
end

#startObject

Listens for new messages associated with the subscriber name As messages come in, ‘on_received_message` is called, which must be overridden on subclasses



7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/google_subscriber/subscription_starter.rb', line 7

def start
  subscriber = new
  pub_sub_subscriber = create_subscription.listen(build_listen_args) do |received_message|
    on_received_wrapper do
      subscriber.on_received_message(received_message)
    end
  end

  pub_sub_subscriber.start
  GoogleSubscriber.logger.info("[#{name}.start] Started subscriber with subscription '#{g_subscription_id}'")

  pub_sub_subscriber
end