Class: PgEventstore::SubscriptionsManager

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/pg_eventstore/subscriptions/subscriptions_manager.rb

Overview

The public Subscriptions API, available to the user.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) ⇒ SubscriptionsManager

Returns a new instance of SubscriptionsManager.

Parameters:

  • config (PgEventstore::Config)
  • set_name (String)
  • max_retries (Integer, nil) (defaults to: nil)

    max number of retries of failed SubscriptionsSet

  • retries_interval (Integer, nil) (defaults to: nil)

    a delay between retries of failed SubscriptionsSet

  • force_lock (Boolean) (defaults to: false)

    whether to force-lock subscriptions



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 52

def initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false)
  @config = config
  @set_name = set_name
  @subscriptions_set_lifecycle = SubscriptionsSetLifecycle.new(
    config_name,
    {
      name: set_name,
      max_restarts_number: max_retries || config.subscriptions_set_max_retries,
      time_between_restarts: retries_interval || config.subscriptions_set_retries_interval,
    }
  )
  @subscriptions_lifecycle = SubscriptionsLifecycle.new(
    config_name, @subscriptions_set_lifecycle, force_lock: force_lock
  )
  @subscription_feeder = SubscriptionFeeder.new(
    config_name: config_name,
    subscriptions_set_lifecycle: @subscriptions_set_lifecycle,
    subscriptions_lifecycle: @subscriptions_lifecycle
  )
end

Instance Attribute Details

#config=(value) ⇒ PgEventstore::Config



41
42
43
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 41

def config
  @config
end

Instance Method Details

#config_nameSymbol

Returns:

  • (Symbol)


152
153
154
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 152

def config_name
  @config.name
end

#startPgEventstore::BasicRunner?

Returns:



144
145
146
147
148
149
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 144

def start
  start!
rescue PgEventstore::SubscriptionAlreadyLockedError => e
  PgEventstore.logger&.warn(e.message)
  nil
end

#start!PgEventstore::BasicRunner



137
138
139
140
141
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 137

def start!
  run_cli_callbacks do
    @subscription_feeder.start
  end
end

#subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) ⇒ void

This method returns an undefined value.

Parameters:

  • subscription_name (String)

    subscription’s name

  • handler (#call)

    subscription’s handler

  • options (Hash) (defaults to: {})

    request options

  • middlewares (Array<Symbol>, nil) (defaults to: nil)

    provide a list of middleware names to override a config’s middlewares

  • pull_interval (Integer, Float) (defaults to: config.subscription_pull_interval)

    an interval in seconds to determine how often to query new events of the given subscription.

  • max_retries (Integer) (defaults to: config.subscription_max_retries)

    max number of retries of failed Subscription

  • retries_interval (Integer, Float) (defaults to: config.subscription_retries_interval)

    a delay between retries of failed Subscription

  • restart_terminator (#call, nil) (defaults to: config.subscription_restart_terminator)

    a callable object which is invoked with PgEventstore::Subscription instance to determine whether restarts should be stopped(true - stops restarts, false - continues restarts)

  • failed_subscription_notifier (#call, nil) (defaults to: config.failed_subscription_notifier)

    a callable object which is invoked with PgEventstore::Subscription instance and error instance after the related subscription died due to error and no longer can be automatically restarted due to max retries number reached. You can use this hook to send a notification about failed subscription.

  • graceful_shutdown_timeout (integer, Float) (defaults to: config.subscription_graceful_shutdown_timeout)

    the number of seconds to wait until force-shutdown the subscription during the stop process

Options Hash (options:):

  • :from_position (Integer, Symbol)

    a starting subscription position

  • :resolve_link_tos (Boolean)

    When using projections to create new events you can set whether the generated events are pointers to existing events. Setting this option to true tells PgEventstore to return the original event instead a link event.

  • :filter (Hash)

    provide it to filter events. It works the same way as a :filter option of Client#read method. Filtering by both - event types and streams are available.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 96

def subscribe(subscription_name, handler:, options: {}, middlewares: nil,
              pull_interval: config.subscription_pull_interval,
              max_retries: config.subscription_max_retries,
              retries_interval: config.subscription_retries_interval,
              restart_terminator: config.subscription_restart_terminator,
              failed_subscription_notifier: config.failed_subscription_notifier,
              graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout)
  subscription = Subscription.using_connection(config.name).new(
    set: @set_name, name: subscription_name, options: options, chunk_query_interval: pull_interval,
    max_restarts_number: max_retries, time_between_restarts: retries_interval
  )
  runner = SubscriptionRunner.new(
    stats: SubscriptionHandlerPerformance.new,
    events_processor: EventsProcessor.new(
      create_raw_event_handler(middlewares, handler),
      graceful_shutdown_timeout: graceful_shutdown_timeout,
      recovery_strategies: recovery_strategies(subscription, restart_terminator, failed_subscription_notifier)
    ),
    subscription: subscription,
    position_evaluation: SubscriptionPositionEvaluation.new(
      config_name: config.name,
      filter_options: options[:filter] || {}
    )
  )

  @subscriptions_lifecycle.runners.push(runner)
  true
end

#subscriptionsArray<PgEventstore::Subscription>

Returns:



126
127
128
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 126

def subscriptions
  @subscriptions_lifecycle.subscriptions.map(&:dup)
end

#subscriptions_setPgEventstore::SubscriptionsSet?



131
132
133
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 131

def subscriptions_set
  @subscriptions_set_lifecycle.subscriptions_set&.dup
end