Class: EventStoreClient::CatchUpSubscriptions

Inherits:
Object
  • Object
show all
Includes:
Configuration
Defined in:
lib/event_store_client/catch_up_subscriptions.rb

Constant Summary collapse

FILTER_DEFAULT_MAX =
32
FILTER_DEFAULT_CHECKPOINT_INTERVAL_MULTIPLIER =
10000

Instance Method Summary collapse

Methods included from Configuration

#config

Instance Method Details

#clean_unusedObject



53
54
55
# File 'lib/event_store_client/catch_up_subscriptions.rb', line 53

def clean_unused
  subscription_store.clean_unused(subscriptions.map(&:name))
end

#create_or_load(subscriber, filter: {}) ⇒ Object



10
11
12
13
14
15
16
17
18
19
# File 'lib/event_store_client/catch_up_subscriptions.rb', line 10

def create_or_load(subscriber, filter: {})
  filter_options = prepare_filter_options(filter)
  position = subscription_store.load_all_position(CatchUpSubscription.name(subscriber))

  subscription = CatchUpSubscription.new(subscriber, position: position, filter: filter_options)
  subscription_store.add(subscription) unless position

  subscriptions << subscription unless @subscriptions.find { |s| s.name == subscription.name }
  subscription
end

#eachObject



21
22
23
# File 'lib/event_store_client/catch_up_subscriptions.rb', line 21

def each
  subscriptions.each { |subscription| yield(subscription) }
end

#listen(subscription) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/event_store_client/catch_up_subscriptions.rb', line 25

def listen(subscription)
  connection.subscribe(subscription.options) do |event_data|
    next if recorded_event?(event_data)
    next if confirmation?(event_data)

    new_position = event_data[0]
    event = event_data[1]

    old_position = subscription.position
    subscription.position = new_position
    subscription_store.update_position(subscription)
    next unless event

    subscription.subscriber.call(event)

    if Thread.current.thread_variable_get(:terminate)
      msg =
        "CatchUpSubscriptions: Terminating subscription listener for #{subscription.subscriber}"
      logger&.info(msg)
      break
    end
  rescue StandardError => e
    subscription.position = old_position
    subscription_store.update_position(subscription)
    config.error_handler&.call(e)
  end
end

#resetObject



57
58
59
# File 'lib/event_store_client/catch_up_subscriptions.rb', line 57

def reset
  subscription_store.reset(subscriptions)
end