Class: Pakyow::Data::Subscribers Private
- Inherits:
-
Object
- Object
- Pakyow::Data::Subscribers
- Extended by:
- Support::DeepFreeze
- Defined in:
- lib/pakyow/data/subscribers.rb,
lib/pakyow/data/subscribers/adapters/redis.rb,
lib/pakyow/data/subscribers/adapters/memory.rb,
lib/pakyow/data/subscribers/adapters/redis/pipeliner.rb
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Defined Under Namespace
Modules: Adapters
Instance Attribute Summary collapse
- #adapter ⇒ Object private
- #lookup ⇒ Object private
Instance Method Summary collapse
- #did_mutate(source_name, changed_values = nil, result_source = nil) ⇒ Object private
- #expire(subscriber, seconds) ⇒ Object private
-
#initialize(app, adapter = :memory, adapter_config = {}) ⇒ Subscribers
constructor
private
A new instance of Subscribers.
- #persist(subscriber) ⇒ Object private
- #register_subscriptions(subscriptions, subscriber: nil, &block) ⇒ Object private
- #shutdown ⇒ Object private
- #unsubscribe(subscriber) ⇒ Object private
Constructor Details
#initialize(app, adapter = :memory, adapter_config = {}) ⇒ Subscribers
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of Subscribers.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/pakyow/data/subscribers.rb', line 19 def initialize(app, adapter = :memory, adapter_config = {}) @app = app require "pakyow/data/subscribers/adapters/#{adapter}" @adapter = Pakyow::Data::Subscribers::Adapters.const_get( adapter.to_s.capitalize ).new( adapter_config.to_h.merge( app.config.data.subscriptions.adapter_settings.to_h ) ) @executor = Concurrent::ThreadPoolExecutor.new( auto_terminate: false, min_threads: 1, max_threads: 10, max_queue: 0 ) rescue LoadError, NameError => error raise UnknownSubscriberAdapter.build(error, adapter: adapter) end |
Instance Attribute Details
#adapter ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
12 13 14 |
# File 'lib/pakyow/data/subscribers.rb', line 12 def adapter @adapter end |
#lookup ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
12 13 14 |
# File 'lib/pakyow/data/subscribers.rb', line 12 def lookup @lookup end |
Instance Method Details
#did_mutate(source_name, changed_values = nil, result_source = nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/pakyow/data/subscribers.rb', line 56 def did_mutate(source_name, changed_values = nil, result_source = nil) @executor.post(source_name, changed_values, result_source, Pakyow.logger.target) do |source_name, changed_values, result_source, logger| logger.internal { "[Pakyow::Data::Subscribers] did mutate #{source_name}" } subscriptions = @adapter.subscriptions_for_source(source_name) logger.internal { "[Pakyow::Data::Subscribers] fetched #{subscriptions.count} subscriptions" } subscriptions.uniq { |subscription| subscription.dig(:payload, :id) || subscription }.select { |subscription| process?(subscription, changed_values, result_source) }.each do |subscription| if subscription[:version] == @app.config.data.subscriptions.version begin logger.internal { "[Pakyow::Data::Subscribers] processing subscription #{subscription[:id]}" } process(subscription, result_source) logger.internal { "[Pakyow::Data::Subscribers] finished processing subscription #{subscription[:id]}" } rescue => error logger.error { "[Pakyow::Data::Subscribers] did_mutate failed: #{error}" } end end end logger.internal { "[Pakyow::Data::Subscribers] finished mutate for #{source_name}" } end end |
#expire(subscriber, seconds) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
102 103 104 |
# File 'lib/pakyow/data/subscribers.rb', line 102 def expire(subscriber, seconds) @adapter.expire(subscriber, seconds) end |
#persist(subscriber) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
106 107 108 |
# File 'lib/pakyow/data/subscribers.rb', line 106 def persist(subscriber) @adapter.persist(subscriber) end |
#register_subscriptions(subscriptions, subscriber: nil, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
46 47 48 49 50 51 52 53 54 |
# File 'lib/pakyow/data/subscribers.rb', line 46 def register_subscriptions(subscriptions, subscriber: nil, &block) subscriptions.each do |subscription| subscription[:version] = @app.config.data.subscriptions.version end @adapter.register_subscriptions(subscriptions, subscriber: subscriber).tap do |ids| yield ids if block_given? end end |
#shutdown ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
41 42 43 44 |
# File 'lib/pakyow/data/subscribers.rb', line 41 def shutdown @executor.shutdown @executor.wait_for_termination(30) end |
#unsubscribe(subscriber) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
98 99 100 |
# File 'lib/pakyow/data/subscribers.rb', line 98 def unsubscribe(subscriber) @adapter.unsubscribe(subscriber) end |