Class: EventSourcery::Postgres::Tracker

Inherits:
Object
  • Object
show all
Defined in:
lib/event_sourcery/postgres/tracker.rb

Overview

This will set up a persisted event id tracker for processors.

Instance Method Summary collapse

Constructor Details

#initialize(db_connection = EventSourcery::Postgres.config.projections_database, table_name: EventSourcery::Postgres.config.tracker_table_name, obtain_processor_lock: true) ⇒ Tracker

Returns a new instance of Tracker.



7
8
9
10
11
12
13
# File 'lib/event_sourcery/postgres/tracker.rb', line 7

def initialize(db_connection = EventSourcery::Postgres.config.projections_database,
               table_name: EventSourcery::Postgres.config.tracker_table_name,
               obtain_processor_lock: true)
  @db_connection = db_connection
  @table_name = table_name.to_sym
  @obtain_processor_lock = obtain_processor_lock
end

Instance Method Details

#last_processed_event_id(processor_name) ⇒ Int?

This will return the last processed event id for the given processor name.

Parameters:

  • processor_name

    the name of the processor you want to look up

Returns:

  • (Int, nil)

    the value of the last event_id processed



69
70
71
72
# File 'lib/event_sourcery/postgres/tracker.rb', line 69

def last_processed_event_id(processor_name)
  track_entry = table.where(name: processor_name.to_s).first
  track_entry[:last_processed_event_id] if track_entry
end

#processed_event(processor_name, event_id) ⇒ Object

This will updated the tracker table to the given event id value for the given processor name.

Parameters:

  • processor_name

    the name of the processor to update

  • event_id

    the event id number to update to



38
39
40
41
42
43
# File 'lib/event_sourcery/postgres/tracker.rb', line 38

def processed_event(processor_name, event_id)
  table
    .where(name: processor_name.to_s)
    .update(last_processed_event_id: event_id)
  true
end

#processing_event(processor_name, event_id) ⇒ Object

This allows you to process an event and update the tracker table in a single transaction. Will yield the given block first then update the the tracker table to the give event id for the given processor name.

Parameters:

  • processor_name

    the name of the processor to update

  • event_id

    the event id number to update to



51
52
53
54
55
56
# File 'lib/event_sourcery/postgres/tracker.rb', line 51

def processing_event(processor_name, event_id)
  @db_connection.transaction do
    yield
    processed_event(processor_name, event_id)
  end
end

#reset_last_processed_event_id(processor_name) ⇒ Object

This will reset the tracker to the start (0) for the given processor name.

Parameters:

  • processor_name

    the name of the processor to reset to 0



61
62
63
# File 'lib/event_sourcery/postgres/tracker.rb', line 61

def reset_last_processed_event_id(processor_name)
  table.where(name: processor_name.to_s).update(last_processed_event_id: 0)
end

#setup(processor_name = nil) ⇒ Object

Set up the given processor. This will create the projector tracker table if it does not exist. If given a processor_name it will then attempt to get a lock on the db.

Parameters:

  • processor_name (defaults to: nil)

    the name of the processor

Raises:

  • (UnableToLockProcessorError)


20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/event_sourcery/postgres/tracker.rb', line 20

def setup(processor_name = nil)
  create_table_if_not_exists if EventSourcery::Postgres.config.auto_create_projector_tracker

  raise UnableToLockProcessorError, 'Projector tracker table does not exist' unless tracker_table_exists?

  return unless processor_name

  create_track_entry_if_not_exists(processor_name)
  return unless @obtain_processor_lock

  obtain_global_lock_on_processor(processor_name)
end

#tracked_processorsArray

Will return an array of all known tracked processors.

Returns:

  • (Array)

    array of all known tracked processors



77
78
79
# File 'lib/event_sourcery/postgres/tracker.rb', line 77

def tracked_processors
  table.select_map(:name)
end