Class: PgEventstore::SubscriptionsManager
- Inherits:
-
Object
- Object
- PgEventstore::SubscriptionsManager
- Extended by:
- Forwardable
- Defined in:
- lib/pg_eventstore/subscriptions/subscriptions_manager.rb
Overview
The public Subscriptions API, available to the user.
Instance Attribute Summary collapse
- #config ⇒ PgEventstore::Config writeonly
Instance Method Summary collapse
- #config_name ⇒ Symbol
-
#initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) ⇒ SubscriptionsManager
constructor
A new instance of SubscriptionsManager.
- #start ⇒ PgEventstore::BasicRunner?
- #start! ⇒ PgEventstore::BasicRunner
- #subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) ⇒ void
- #subscriptions ⇒ Array<PgEventstore::Subscription>
- #subscriptions_set ⇒ PgEventstore::SubscriptionsSet?
Constructor Details
#initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) ⇒ SubscriptionsManager
Returns a new instance of SubscriptionsManager.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 49 def initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) @config = config @set_name = set_name @subscriptions_set_lifecycle = SubscriptionsSetLifecycle.new( config_name, { name: set_name, max_restarts_number: max_retries || config.subscriptions_set_max_retries, time_between_restarts: retries_interval || config.subscriptions_set_retries_interval, } ) @subscriptions_lifecycle = SubscriptionsLifecycle.new( config_name, @subscriptions_set_lifecycle, force_lock: force_lock ) @subscription_feeder = SubscriptionFeeder.new( config_name: config_name, subscriptions_set_lifecycle: @subscriptions_set_lifecycle, subscriptions_lifecycle: @subscriptions_lifecycle ) end |
Instance Attribute Details
#config=(value) ⇒ PgEventstore::Config
38 39 40 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 38 def config @config end |
Instance Method Details
#config_name ⇒ Symbol
145 146 147 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 145 def config_name @config.name end |
#start ⇒ PgEventstore::BasicRunner?
137 138 139 140 141 142 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 137 def start start! rescue PgEventstore::SubscriptionAlreadyLockedError => e PgEventstore.logger&.warn(e.) nil end |
#start! ⇒ PgEventstore::BasicRunner
130 131 132 133 134 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 130 def start! run_cli_callbacks do @subscription_feeder.start end end |
#subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) ⇒ void
This method returns an undefined value.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 93 def subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) subscription = Subscription.using_connection(config.name).new( set: @set_name, name: subscription_name, options: , chunk_query_interval: pull_interval, max_restarts_number: max_retries, time_between_restarts: retries_interval ) runner = SubscriptionRunner.new( stats: SubscriptionHandlerPerformance.new, events_processor: EventsProcessor.new( create_raw_event_handler(middlewares, handler), graceful_shutdown_timeout: graceful_shutdown_timeout, recovery_strategies: recovery_strategies(subscription, restart_terminator, failed_subscription_notifier) ), subscription: subscription ) @subscriptions_lifecycle.runners.push(runner) true end |
#subscriptions ⇒ Array<PgEventstore::Subscription>
119 120 121 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 119 def subscriptions @subscriptions_lifecycle.subscriptions.map(&:dup) end |
#subscriptions_set ⇒ PgEventstore::SubscriptionsSet?
124 125 126 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 124 def subscriptions_set @subscriptions_set_lifecycle.subscriptions_set&.dup end |