Class: PgEventstore::SubscriptionsManager

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/pg_eventstore/subscriptions/subscriptions_manager.rb

Overview

The public Subscriptions API, available to the user.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false) ⇒ SubscriptionsManager

Returns a new instance of SubscriptionsManager.

Parameters:

  • config (PgEventstore::Config)
  • set_name (String)
  • max_retries (Integer, nil) (defaults to: nil)

    max number of retries of failed SubscriptionsSet

  • retries_interval (Integer, nil) (defaults to: nil)

    a delay between retries of failed SubscriptionsSet

  • force_lock (Boolean) (defaults to: false)

    whether to force-lock subscriptions



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 49

def initialize(config:, set_name:, max_retries: nil, retries_interval: nil, force_lock: false)
  @config = config
  @set_name = set_name
  @subscriptions_set_lifecycle = SubscriptionsSetLifecycle.new(
    config_name,
    {
      name: set_name,
      max_restarts_number: max_retries || config.subscriptions_set_max_retries,
      time_between_restarts: retries_interval || config.subscriptions_set_retries_interval,
    }
  )
  @subscriptions_lifecycle = SubscriptionsLifecycle.new(
    config_name, @subscriptions_set_lifecycle, force_lock: force_lock
  )
  @subscription_feeder = SubscriptionFeeder.new(
    config_name: config_name,
    subscriptions_set_lifecycle: @subscriptions_set_lifecycle,
    subscriptions_lifecycle: @subscriptions_lifecycle
  )
end

Instance Attribute Details

#config=(value) ⇒ PgEventstore::Config



38
39
40
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 38

def config
  @config
end

Instance Method Details

#config_nameSymbol

Returns:

  • (Symbol)


145
146
147
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 145

def config_name
  @config.name
end

#startPgEventstore::BasicRunner?

Returns:



137
138
139
140
141
142
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 137

def start
  start!
rescue PgEventstore::SubscriptionAlreadyLockedError => e
  PgEventstore.logger&.warn(e.message)
  nil
end

#start!PgEventstore::BasicRunner



130
131
132
133
134
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 130

def start!
  run_cli_callbacks do
    @subscription_feeder.start
  end
end

#subscribe(subscription_name, handler:, options: {}, middlewares: nil, pull_interval: config.subscription_pull_interval, max_retries: config.subscription_max_retries, retries_interval: config.subscription_retries_interval, restart_terminator: config.subscription_restart_terminator, failed_subscription_notifier: config.failed_subscription_notifier, graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout) ⇒ void

This method returns an undefined value.

Parameters:

  • subscription_name (String)

    subscription’s name

  • handler (#call)

    subscription’s handler

  • options (Hash) (defaults to: {})

    request options

  • middlewares (Array<Symbol>, nil) (defaults to: nil)

    provide a list of middleware names to override a config’s middlewares

  • pull_interval (Integer, Float) (defaults to: config.subscription_pull_interval)

    an interval in seconds to determine how often to query new events of the given subscription.

  • max_retries (Integer) (defaults to: config.subscription_max_retries)

    max number of retries of failed Subscription

  • retries_interval (Integer, Float) (defaults to: config.subscription_retries_interval)

    a delay between retries of failed Subscription

  • restart_terminator (#call, nil) (defaults to: config.subscription_restart_terminator)

    a callable object which is invoked with PgEventstore::Subscription instance to determine whether restarts should be stopped(true - stops restarts, false - continues restarts)

  • failed_subscription_notifier (#call, nil) (defaults to: config.failed_subscription_notifier)

    a callable object which is invoked with PgEventstore::Subscription instance and error instance after the related subscription died due to error and no longer can be automatically restarted due to max retries number reached. You can use this hook to send a notification about failed subscription.

  • graceful_shutdown_timeout (integer, Float) (defaults to: config.subscription_graceful_shutdown_timeout)

    the number of seconds to wait until force-shutdown the subscription during the stop process

Options Hash (options:):

  • :from_position (Integer, Symbol)

    a starting subscription position

  • :resolve_link_tos (Boolean)

    When using projections to create new events you can set whether the generated events are pointers to existing events. Setting this option to true tells PgEventstore to return the original event instead a link event.

  • :filter (Hash)

    provide it to filter events. It works the same way as a :filter option of Client#read method. Filtering by both - event types and streams are available.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 93

def subscribe(subscription_name, handler:, options: {}, middlewares: nil,
              pull_interval: config.subscription_pull_interval,
              max_retries: config.subscription_max_retries,
              retries_interval: config.subscription_retries_interval,
              restart_terminator: config.subscription_restart_terminator,
              failed_subscription_notifier: config.failed_subscription_notifier,
              graceful_shutdown_timeout: config.subscription_graceful_shutdown_timeout)
  subscription = Subscription.using_connection(config.name).new(
    set: @set_name, name: subscription_name, options: options, chunk_query_interval: pull_interval,
    max_restarts_number: max_retries, time_between_restarts: retries_interval
  )
  runner = SubscriptionRunner.new(
    stats: SubscriptionHandlerPerformance.new,
    events_processor: EventsProcessor.new(
      create_raw_event_handler(middlewares, handler),
      graceful_shutdown_timeout: graceful_shutdown_timeout,
      recovery_strategies: recovery_strategies(subscription, restart_terminator, failed_subscription_notifier)
    ),
    subscription: subscription
  )

  @subscriptions_lifecycle.runners.push(runner)
  true
end

#subscriptionsArray<PgEventstore::Subscription>

Returns:



119
120
121
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 119

def subscriptions
  @subscriptions_lifecycle.subscriptions.map(&:dup)
end

#subscriptions_setPgEventstore::SubscriptionsSet?



124
125
126
# File 'lib/pg_eventstore/subscriptions/subscriptions_manager.rb', line 124

def subscriptions_set
  @subscriptions_set_lifecycle.subscriptions_set&.dup
end