Module: EventSourcery::EventProcessing::EventStreamProcessor

Defined in:
lib/event_sourcery/event_processing/event_stream_processor.rb

Defined Under Namespace

Modules: ClassMethods, InstanceMethods, ProcessHandler

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#trackerObject



137
138
139
# File 'lib/event_sourcery/event_processing/event_stream_processor.rb', line 137

def tracker
  @tracker
end

Class Method Details

.included(base) ⇒ Object



4
5
6
7
8
9
10
11
12
13
# File 'lib/event_sourcery/event_processing/event_stream_processor.rb', line 4

def self.included(base)
  base.extend(ClassMethods)
  base.include(InstanceMethods)
  base.prepend(ProcessHandler)
  EventSourcery.event_stream_processor_registry.register(base)
  base.class_eval do
    @event_handlers = Hash.new { |hash, key| hash[key] = [] }
    @all_event_handler = nil
  end
end

Instance Method Details

#last_processed_event_idInt

Return the last processed event id

Returns:

  • (Int)

    the id of the last processed event



109
110
111
# File 'lib/event_sourcery/event_processing/event_stream_processor.rb', line 109

def last_processed_event_id
  tracker.last_processed_event_id(processor_name)
end

#processes?(event_type) ⇒ Boolean

Calls processes? method on the instance class

Returns:

  • (Boolean)


119
120
121
# File 'lib/event_sourcery/event_processing/event_stream_processor.rb', line 119

def processes?(event_type)
  self.class.processes?(event_type)
end

#processes_event_typesObject

Calls processes_event_types method on the instance class



92
93
94
# File 'lib/event_sourcery/event_processing/event_stream_processor.rb', line 92

def processes_event_types
  self.class.processes_event_types
end

#processor_nameObject

Calls processor_name method on the instance class



114
115
116
# File 'lib/event_sourcery/event_processing/event_stream_processor.rb', line 114

def processor_name
  self.class.processor_name
end

#resetObject

Reset the event tracker



102
103
104
# File 'lib/event_sourcery/event_processing/event_stream_processor.rb', line 102

def reset
  tracker.reset_last_processed_event_id(processor_name)
end

#setupObject

Set up the event tracker



97
98
99
# File 'lib/event_sourcery/event_processing/event_stream_processor.rb', line 97

def setup
  tracker.setup(processor_name)
end

#subscribe_to(event_source, subscription_master: EventStore::SignalHandlingSubscriptionMaster.new) ⇒ Object

Subscribe to the given event source.

Parameters:

  • event_source

    the event source to subscribe to

  • subscription_master (SignalHandlingSubscriptionMaster) (defaults to: EventStore::SignalHandlingSubscriptionMaster.new)


127
128
129
130
131
132
133
134
# File 'lib/event_sourcery/event_processing/event_stream_processor.rb', line 127

def subscribe_to(event_source, subscription_master: EventStore::SignalHandlingSubscriptionMaster.new)
  setup
  event_source.subscribe(from_id: last_processed_event_id + 1,
                        event_types: processes_event_types,
                        subscription_master: subscription_master) do |events|
    process_events(events, subscription_master)
  end
end