Class: ActionPubsub::ActiveRecord::Subscription

Inherits:
Concurrent::Actor::Utils::AdHoc
  • Object
show all
Defined in:
lib/action_pubsub/active_record/subscription.rb

Class Method Summary collapse

Class Method Details

.bind_subscription(target_exchange, subscriber_key) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/action_pubsub/active_record/subscription.rb', line 6

def self.bind_subscription(target_exchange, subscriber_key)
  ::ActionPubsub.exchanges[target_exchange][subscriber_key] << :subscribe
  -> message {
    ::ActiveRecord::Base.connection_pool.with_connection do
      begin
        message = ::ActionPubsub.deserialize_event(message)
        reaction = self.class.subscriber.reactions[message["action"]]
        record = message["record"]

        if self.class.subscriber.react?(message["action"], reaction, record)
          self.class.subscriber.increment_event_triggered_count!
          subscriber_instance = self.class.subscriber.new(record)
          subscriber_instance.instance_exec(record, &reaction[:block])
        end

        self.class.bind_subscription(target_exchange, subscriber_key)
      rescue => e
        #ensure we rebind subscription regardless
        self.class.bind_subscription(target_exchange, subscriber_key) unless message.is_a?(Symbol)
        message = ::ActionPubsub.deserialize_event(message)

        failure_message = ::ActionPubsub::Errors::SubscriptionReactionErrorMessage.new(
          :target_exchange => target_exchange,
          :subscriber_key => subscriber_key,
          :error => e,
          :message => message
        )

        ::ActionPubsub.config._on_error_block.call(failure_message) if ::ActionPubsub.config._on_error_block
      end
    end
  }
end