Class: ActionPubsub::ActiveRecord::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/action_pubsub/active_record/subscriber.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(record, event: nil) ⇒ Subscriber

Instance Methods ###



87
88
89
90
# File 'lib/action_pubsub/active_record/subscriber.rb', line 87

def initialize(record, event:nil)
  @resource = record
  @current_event = event if event
end

Instance Attribute Details

#current_eventObject

Returns the value of attribute current_event.



17
18
19
# File 'lib/action_pubsub/active_record/subscriber.rb', line 17

def current_event
  @current_event
end

#resourceObject

Returns the value of attribute resource.



17
18
19
# File 'lib/action_pubsub/active_record/subscriber.rb', line 17

def resource
  @resource
end

Class Method Details

.disable_all!Object



33
34
# File 'lib/action_pubsub/active_record/subscriber.rb', line 33

def self.disable_all!
end

.increment_event_failed_count!Object



45
46
47
# File 'lib/action_pubsub/active_record/subscriber.rb', line 45

def self.increment_event_failed_count!
  self.event_failed_count.increment
end

.increment_event_processed_count!Object



49
50
51
# File 'lib/action_pubsub/active_record/subscriber.rb', line 49

def self.increment_event_processed_count!
  self.event_processed_count.increment
end

.increment_event_triggered_count!Object



53
54
55
# File 'lib/action_pubsub/active_record/subscriber.rb', line 53

def self.increment_event_triggered_count!
  self.event_triggered_count.increment
end

.inherited(subklass) ⇒ Object

the indirection here with the “subscription” dynamically created class, is for the sake of making subscribers immutable and not storing instance state. i.e. subscription is the actual actor, which just instantiates this subscriber class and performs the task it needs to



23
24
25
26
27
28
29
30
31
# File 'lib/action_pubsub/active_record/subscriber.rb', line 23

def self.inherited(subklass)
  subklass.subscription = subklass.const_set("Subscription", ::Class.new(::ActionPubsub::ActiveRecord::Subscription))
  subklass.subscription.subscriber = subklass
  subklass.reactions = {}
  subklass.observed_exchanges = ::Set.new
  subklass.event_triggered_count = ::Concurrent::AtomicFixnum.new(0)
  subklass.event_failed_count = ::Concurrent::AtomicFixnum.new(0)
  subklass.event_processed_count = ::Concurrent::AtomicFixnum.new(0)
end

.on(event_name, **options, &block) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/action_pubsub/active_record/subscriber.rb', line 36

def self.on(event_name, **options, &block)
  reactions[event_name] = {}.tap do |hash|
    hash[:block] = block
    hash[:conditions] = options.extract!(:if, :unless)
  end

  register_reaction_to_event(event_name)
end

.react?(event_name, reaction, record) ⇒ Boolean

Returns:

  • (Boolean)


61
62
63
64
65
66
67
68
# File 'lib/action_pubsub/active_record/subscriber.rb', line 61

def self.react?(event_name, reaction, record)
  return false if reaction[:block].blank?
  return true if reaction[:conditions].blank?
  result = true
  result &&= !reaction[:conditions][:unless].call(record) if reaction[:conditions].key?(:unless)
  result &&= reaction[:conditions][:if].call(record) if reaction[:conditions].key?(:if)
  return result
end

.register_reaction_to_event(event_name) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/action_pubsub/active_record/subscriber.rb', line 70

def self.register_reaction_to_event(event_name)
  observed_exchanges.each do |exchange_prefix|
    target_exchange = [exchange_prefix, event_name].join("/")
    subscriber_key = name.underscore
    queue_key = [target_exchange, subscriber_key].join("/")
    ::ActionPubsub.register_queue(target_exchange, subscriber_key)

    self.concurrency.times do |i|
      queue_address = "#{queue_key}/#{i}"
      ::ActionPubsub.subscriptions[queue_address] ||= self.subscription.spawn(queue_address) do
        self.subscription.bind_subscription(target_exchange, subscriber_key)
      end
    end
  end
end

.subscribe_to(*exchanges) ⇒ Object



57
58
59
# File 'lib/action_pubsub/active_record/subscriber.rb', line 57

def self.subscribe_to(*exchanges)
  exchanges.each{ |exchange| self.observed_exchanges << exchange }
end