Class: PgEventstore::SubscriptionsManager

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/pg_eventstore/subscriptions/subscriptions_manager.rb

Overview

The public Subscriptions API, available to the user.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) ⇒ SubscriptionsManager

Returns a new instance of SubscriptionsManager.

Parameters:

  • config (PgEventstore::Config)
  • set_name (String)
  • max_retries (Integer, nil) (defaults to: nil)

    max number of retries of failed SubscriptionsSet

  • retries_interval (Integer, nil) (defaults to: nil)

    a delay between retries of failed SubscriptionsSet

  • force_lock (Boolean) (defaults to: false)

    whether to force-lock subscriptions



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 51

def initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false)
  @config = config
  @set_name = set_name
  @subscriptions_set_lifecycle = SubscriptionsSetLifecycle.new(
    config_name,
    {
      name: set_name,
      max_restarts_number: max_retries || config.subscriptions_set_max_retries,
      time_between_restarts: retries_interval || config.subscriptions_set_retries_interval,
    }
  )
  @subscriptions_lifecycle = SubscriptionsLifecycle.new(
    config_name, @subscriptions_set_lifecycle, force_lock:
  )
  @subscription_feeder = SubscriptionFeeder.new(
    config_name:,
    subscriptions_set_lifecycle: @subscriptions_set_lifecycle,
    subscriptions_lifecycle: @subscriptions_lifecycle
  )
end

Instance Attribute Details

#config=(value) ⇒ PgEventstore::Config



40
41
42
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 40

def config
  @config
end

Instance Method Details

#config_nameSymbol

Returns:

  • (Symbol)


147
148
149
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 147

def config_name
  @config.name
end

#startPgEventstore::BasicRunner?

Returns:



139
140
141
142
143
144
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 139

def start
  start!
rescue PgEventstore::SubscriptionAlreadyLockedError => e
  PgEventstore.logger&.warn(e.message)
  nil
end

#start!PgEventstore::BasicRunner



132
133
134
135
136
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 132

def start!
  run_cli_callbacks do
    @subscription_feeder.start
  end
end

#subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) ⇒ void

This method returns an undefined value.

Parameters:

  • subscription_name (String)

    subscription’s name

  • handler (#call)

    subscription’s handler

  • options (Hash) (defaults to: {})

    request options

  • middlewares (Array<Symbol>, nil) (defaults to: nil)

    provide a list of middleware names to override a config’s middlewares

  • pull_interval (Integer, Float) (defaults to: config.subscription_pull_interval)

    an interval in seconds to determine how often to query new events of the given subscription.

  • max_retries (Integer) (defaults to: config.subscription_max_retries)

    max number of retries of failed Subscription

  • retries_interval (Integer, Float) (defaults to: config.subscription_retries_interval)

    a delay between retries of failed Subscription

  • restart_terminator (#call, nil) (defaults to: config.subscription_restart_terminator)

    a callable object which is invoked with PgEventstore::Subscription instance to determine whether restarts should be stopped(true - stops restarts, false - continues restarts)

  • failed_subscription_notifier (#call, nil) (defaults to: config.failed_subscription_notifier)

    a callable object which is invoked with PgEventstore::Subscription instance and error instance after the related subscription died due to error and no longer can be automatically restarted due to max retries number reached. You can use this hook to send a notification about failed subscription.

  • graceful_shutdown_timeout (integer, Float) (defaults to: config.subscription_graceful_shutdown_timeout)

    the number of seconds to wait until force-shutdown the subscription during the stop process

Options Hash (options:):

  • :from_position (Integer, Symbol)

    a starting subscription position

  • :resolve_link_tos (Boolean)

    When using projections to create new events you can set whether the generated events are pointers to existing events. Setting this option to true tells PgEventstore to return the original event instead a link event.

  • :filter (Hash)

    provide it to filter events. It works the same way as a :filter option of Client#read method. Filtering by both - event types and streams are available.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 95

def subscribe(subscription_name, handler:, options: {}, middlewares: nil,
              pull_interval: config.subscription_pull_interval,
              max_retries: config.subscription_max_retries,
              retries_interval: config.subscription_retries_interval,
              restart_terminator: config.subscription_restart_terminator,
              failed_subscription_notifier: config.failed_subscription_notifier,
              graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout)
  subscription = Subscription.using_connection(config.name).new(
    set: @set_name, name: subscription_name, options:, chunk_query_interval: pull_interval,
    max_restarts_number: max_retries, time_between_restarts: retries_interval
  )
  runner = SubscriptionRunner.new(
    stats: SubscriptionHandlerPerformance.new,
    events_processor: EventsProcessor.new(
      create_raw_event_handler(middlewares, handler),
      graceful_shutdown_timeout:,
      recovery_strategies: recovery_strategies(subscription, restart_terminator, failed_subscription_notifier)
    ),
    subscription:
  )

  @subscriptions_lifecycle.runners.push(runner)
  true
end

#subscriptionsArray<PgEventstore::Subscription>

Returns:



121
122
123
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 121

def subscriptions
  @subscriptions_lifecycle.subscriptions.map(&:dup)
end

#subscriptions_setPgEventstore::SubscriptionsSet?



126
127
128
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 126

def subscriptions_set
  @subscriptions_set_lifecycle.subscriptions_set&.dup
end