Class: PgEventstore::SubscriptionsManager
- Inherits:
-
Object
- Object
- PgEventstore::SubscriptionsManager
- Extended by:
- Forwardable
- Defined in:
- lib/pg_eventstore/subscriptions/subscriptions_manager.rb
Overview
The public Subscriptions API, available to the user.
Instance Attribute Summary collapse
- #config ⇒ PgEventstore::Config writeonly
Instance Method Summary collapse
- #config_name ⇒ Symbol
-
#initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) ⇒ SubscriptionsManager
constructor
A new instance of SubscriptionsManager.
- #start ⇒ PgEventstore::BasicRunner?
- #start! ⇒ PgEventstore::BasicRunner
- #subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) ⇒ void
- #subscriptions ⇒ Array<PgEventstore::Subscription>
- #subscriptions_set ⇒ PgEventstore::SubscriptionsSet?
Constructor Details
#initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) ⇒ SubscriptionsManager
Returns a new instance of SubscriptionsManager.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 52 def initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) @config = config @set_name = set_name @subscriptions_set_lifecycle = SubscriptionsSetLifecycle.new( config_name, { name: set_name, max_restarts_number: max_retries || config.subscriptions_set_max_retries, time_between_restarts: retries_interval || config.subscriptions_set_retries_interval, } ) @subscriptions_lifecycle = SubscriptionsLifecycle.new( config_name, @subscriptions_set_lifecycle, force_lock: force_lock ) @subscription_feeder = SubscriptionFeeder.new( config_name: config_name, subscriptions_set_lifecycle: @subscriptions_set_lifecycle, subscriptions_lifecycle: @subscriptions_lifecycle ) end |
Instance Attribute Details
#config=(value) ⇒ PgEventstore::Config
41 42 43 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 41 def config @config end |
Instance Method Details
#config_name ⇒ Symbol
152 153 154 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 152 def config_name @config.name end |
#start ⇒ PgEventstore::BasicRunner?
144 145 146 147 148 149 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 144 def start start! rescue PgEventstore::SubscriptionAlreadyLockedError => e PgEventstore.logger&.warn(e.) nil end |
#start! ⇒ PgEventstore::BasicRunner
137 138 139 140 141 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 137 def start! run_cli_callbacks do @subscription_feeder.start end end |
#subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) ⇒ void
This method returns an undefined value.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 96 def subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) subscription = Subscription.using_connection(config.name).new( set: @set_name, name: subscription_name, options: , chunk_query_interval: pull_interval, max_restarts_number: max_retries, time_between_restarts: retries_interval ) runner = SubscriptionRunner.new( stats: SubscriptionHandlerPerformance.new, events_processor: EventsProcessor.new( create_raw_event_handler(middlewares, handler), graceful_shutdown_timeout: graceful_shutdown_timeout, recovery_strategies: recovery_strategies(subscription, restart_terminator, failed_subscription_notifier) ), subscription: subscription, position_evaluation: SubscriptionPositionEvaluation.new( config_name: config.name, filter_options: [:filter] || {} ) ) @subscriptions_lifecycle.runners.push(runner) true end |
#subscriptions ⇒ Array<PgEventstore::Subscription>
126 127 128 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 126 def subscriptions @subscriptions_lifecycle.subscriptions.map(&:dup) end |
#subscriptions_set ⇒ PgEventstore::SubscriptionsSet?
131 132 133 |
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 131 def subscriptions_set @subscriptions_set_lifecycle.subscriptions_set&.dup end |