Class: PgEventstore::SubscriptionsManager
- Inherits:
-
Object
- Object
- PgEventstore::SubscriptionsManager
- Extended by:
- Forwardable
- Defined in:
- lib/pg_eventstore/subscriptions/subscriptions_manager.rb
Overview
The public Subscriptions API, available to the user.
Instance Attribute Summary collapse
- #config ⇒ PgEventstore::Config writeonly
Instance Method Summary collapse
- #config_name ⇒ Symbol
-
#initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) ⇒ SubscriptionsManager
constructor
A new instance of SubscriptionsManager.
- #start ⇒ PgEventstore::BasicRunner?
- #start! ⇒ PgEventstore::BasicRunner
- #subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) ⇒ void
- #subscriptions ⇒ Array<PgEventstore::Subscription>
- #subscriptions_set ⇒ PgEventstore::SubscriptionsSet?
Constructor Details
#initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) ⇒ SubscriptionsManager
Returns a new instance of SubscriptionsManager.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 51 def initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) @config = config @set_name = set_name @subscriptions_set_lifecycle = SubscriptionsSetLifecycle.new( config_name, { name: set_name, max_restarts_number: max_retries || config.subscriptions_set_max_retries, time_between_restarts: retries_interval || config.subscriptions_set_retries_interval, } ) @subscriptions_lifecycle = SubscriptionsLifecycle.new( config_name, @subscriptions_set_lifecycle, force_lock: ) @subscription_feeder = SubscriptionFeeder.new( config_name:, subscriptions_set_lifecycle: @subscriptions_set_lifecycle, subscriptions_lifecycle: @subscriptions_lifecycle ) end |
Instance Attribute Details
#config=(value) ⇒ PgEventstore::Config
40 41 42 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 40 def config @config end |
Instance Method Details
#config_name ⇒ Symbol
147 148 149 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 147 def config_name @config.name end |
#start ⇒ PgEventstore::BasicRunner?
139 140 141 142 143 144 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 139 def start start! rescue PgEventstore::SubscriptionAlreadyLockedError => e PgEventstore.logger&.warn(e.) nil end |
#start! ⇒ PgEventstore::BasicRunner
132 133 134 135 136 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 132 def start! run_cli_callbacks do @subscription_feeder.start end end |
#subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) ⇒ void
This method returns an undefined value.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 95 def subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) subscription = Subscription.using_connection(config.name).new( set: @set_name, name: subscription_name, options:, chunk_query_interval: pull_interval, max_restarts_number: max_retries, time_between_restarts: retries_interval ) runner = SubscriptionRunner.new( stats: SubscriptionHandlerPerformance.new, events_processor: EventsProcessor.new( create_raw_event_handler(middlewares, handler), graceful_shutdown_timeout:, recovery_strategies: recovery_strategies(subscription, restart_terminator, failed_subscription_notifier) ), subscription: ) @subscriptions_lifecycle.runners.push(runner) true end |
#subscriptions ⇒ Array<PgEventstore::Subscription>
121 122 123 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 121 def subscriptions @subscriptions_lifecycle.subscriptions.map(&:dup) end |
#subscriptions_set ⇒ PgEventstore::SubscriptionsSet?
126 127 128 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 126 def subscriptions_set @subscriptions_set_lifecycle.subscriptions_set&.dup end |