Class: Trabox::Subscriber::Google::Cloud::PubSub

Inherits:
Object
  • Object
show all
Includes:
Trabox::Subscriber
Defined in:
lib/trabox/subscriber/google/cloud_pubsub.rb

Instance Method Summary collapse

Constructor Details

#initialize(subscription_id, listen_options: {}, before_listen_acknowledge_callbacks: [], after_listen_acknowledge_callbacks: [], error_listen_callbacks: []) ⇒ PubSub

Returns a new instance of PubSub.

Parameters:

  • subscription_id (String)
  • listen_options (Hash) (defaults to: {})

    listen method options

  • before_listen_acknowledge_callbacks (Array<Proc>) (defaults to: [])
  • after_listen_acknowledge_callbacks (Array<Proc>) (defaults to: [])
  • error_listen_callbacks (Array<Proc>) (defaults to: [])


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/trabox/subscriber/google/cloud_pubsub.rb', line 13

def initialize(subscription_id,
               listen_options: {},
               before_listen_acknowledge_callbacks: [],
               after_listen_acknowledge_callbacks: [],
               error_listen_callbacks: [])

  @listen_options = listen_options
  @before_listen_acknowledge_callbacks = before_listen_acknowledge_callbacks
  @after_listen_acknowledge_callbacks = after_listen_acknowledge_callbacks
  @error_listen_callbacks = error_listen_callbacks

  @pubsub = ::Google::Cloud::PubSub.new

  @subscription = @pubsub.subscription subscription_id

  raise "Subscription-ID='#{subscription_id}' does not exist." if @subscription.nil?

  Rails.logger.info "Subscription '#{subscription_id}': message ordering is #{@subscription.message_ordering?}."
end

Instance Method Details

#subscribeObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/trabox/subscriber/google/cloud_pubsub.rb', line 33

def subscribe
  subscriber = @subscription.listen(**@listen_options) do |received_message|
    @before_listen_acknowledge_callbacks.each do |cb|
      cb.call(received_message)
    end

    received_message.acknowledge!

    @after_listen_acknowledge_callbacks.each do |cb|
      cb.call(received_message)
    end

    Metric.service_check('subscribe.service.check', Metric::SERVICE_OK)
  end

  subscriber.on_error do |_|
    Metric.service_check('subscribe.service.check', Metric::SERVICE_CRITICAL)
  end

  @error_listen_callbacks.each do |cb|
    subscriber.on_error(&cb)
  end

  Rails.logger.info 'Listening subscrition...'
  subscriber.start.wait!
end