Class: ActionPubsub::ActiveRecord::Subscriber
- Inherits:
-
Object
- Object
- ActionPubsub::ActiveRecord::Subscriber
- Defined in:
- lib/action_pubsub/active_record/subscriber.rb
Instance Attribute Summary collapse
-
#current_event ⇒ Object
Returns the value of attribute current_event.
-
#resource ⇒ Object
Returns the value of attribute resource.
Class Method Summary collapse
- .disable_all! ⇒ Object
- .increment_event_failed_count! ⇒ Object
- .increment_event_processed_count! ⇒ Object
- .increment_event_triggered_count! ⇒ Object
-
.inherited(subklass) ⇒ Object
the indirection here with the “subscription” dynamically created class, is for the sake of making subscribers immutable and not storing instance state.
- .on(event_name, **options, &block) ⇒ Object
- .react?(event_name, reaction, record) ⇒ Boolean
- .register_reaction_to_event(event_name) ⇒ Object
- .subscribe_to(*exchanges) ⇒ Object
Instance Method Summary collapse
-
#initialize(record, event: nil) ⇒ Subscriber
constructor
Instance Methods ###.
Constructor Details
#initialize(record, event: nil) ⇒ Subscriber
Instance Methods ###
87 88 89 90 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 87 def initialize(record, event:nil) @resource = record @current_event = event if event end |
Instance Attribute Details
#current_event ⇒ Object
Returns the value of attribute current_event.
17 18 19 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 17 def current_event @current_event end |
#resource ⇒ Object
Returns the value of attribute resource.
17 18 19 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 17 def resource @resource end |
Class Method Details
.disable_all! ⇒ Object
33 34 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 33 def self.disable_all! end |
.increment_event_failed_count! ⇒ Object
45 46 47 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 45 def self.increment_event_failed_count! self.event_failed_count.increment end |
.increment_event_processed_count! ⇒ Object
49 50 51 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 49 def self.increment_event_processed_count! self.event_processed_count.increment end |
.increment_event_triggered_count! ⇒ Object
53 54 55 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 53 def self.increment_event_triggered_count! self.event_triggered_count.increment end |
.inherited(subklass) ⇒ Object
the indirection here with the “subscription” dynamically created class, is for the sake of making subscribers immutable and not storing instance state. i.e. subscription is the actual actor, which just instantiates this subscriber class and performs the task it needs to
23 24 25 26 27 28 29 30 31 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 23 def self.inherited(subklass) subklass.subscription = subklass.const_set("Subscription", ::Class.new(::ActionPubsub::ActiveRecord::Subscription)) subklass.subscription.subscriber = subklass subklass.reactions = {} subklass.observed_exchanges = ::Set.new subklass.event_triggered_count = ::Concurrent::AtomicFixnum.new(0) subklass.event_failed_count = ::Concurrent::AtomicFixnum.new(0) subklass.event_processed_count = ::Concurrent::AtomicFixnum.new(0) end |
.on(event_name, **options, &block) ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 36 def self.on(event_name, **, &block) reactions[event_name] = {}.tap do |hash| hash[:block] = block hash[:conditions] = .extract!(:if, :unless) end register_reaction_to_event(event_name) end |
.react?(event_name, reaction, record) ⇒ Boolean
61 62 63 64 65 66 67 68 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 61 def self.react?(event_name, reaction, record) return false if reaction[:block].blank? return true if reaction[:conditions].blank? result = true result &&= !reaction[:conditions][:unless].call(record) if reaction[:conditions].key?(:unless) result &&= reaction[:conditions][:if].call(record) if reaction[:conditions].key?(:if) return result end |
.register_reaction_to_event(event_name) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 70 def self.register_reaction_to_event(event_name) observed_exchanges.each do |exchange_prefix| target_exchange = [exchange_prefix, event_name].join("/") subscriber_key = name.underscore queue_key = [target_exchange, subscriber_key].join("/") ::ActionPubsub.register_queue(target_exchange, subscriber_key) self.concurrency.times do |i| queue_address = "#{queue_key}/#{i}" ::ActionPubsub.subscriptions[queue_address] ||= self.subscription.spawn(queue_address) do self.subscription.bind_subscription(target_exchange, subscriber_key) end end end end |
.subscribe_to(*exchanges) ⇒ Object
57 58 59 |
# File 'lib/action_pubsub/active_record/subscriber.rb', line 57 def self.subscribe_to(*exchanges) exchanges.each{ |exchange| self.observed_exchanges << exchange } end |