Class: RubyEventStore::ActiveRecord::EventRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/active_record/event_repository.rb

Direct Known Subclasses

PgLinearizedEventRepository

Constant Summary collapse

POSITION_SHIFT =
1

Instance Method Summary collapse

Constructor Details

#initialize(model_factory: WithDefaultModels.new, serializer:) ⇒ EventRepository

Returns a new instance of EventRepository.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 8

def initialize(model_factory: WithDefaultModels.new, serializer:)
  @serializer = serializer
  @event_klass, @stream_klass = model_factory.call
  if serializer == NULL && json_data_type?
    warn <<~MSG
      The data or metadata column is of a JSON/B type and expects a JSON string. 

      Yet the repository serializer is configured as #{serializer} and it would not 
      produce the expected JSON string. 

      In ActiveRecord there's an implicit serialization to JSON for JSON/B column types 
      that made it work so far. This behaviour is unfortunately also a source of undesired 
      double serialization — first in the EventRepository, second in the ActiveRecord.
      
      In the past we've advised workarounds that introduced configuration incosistency 
      with other data types and serialization formats, i.e. explicitly passing NULL serializer 
      just for the JSON/B data types.

      As of now this special ActiveRecord behaviour is disabled. You should be using JSON 
      serializer back again:

      RubyEventStore::ActiveRecord::EventRepository.new(serializer: JSON)
    MSG
  else
    @event_klass.include(SkipJsonSerialization)
  end
  @repo_reader = EventRepositoryReader.new(@event_klass, @stream_klass, serializer)
  @index_violation_detector = IndexViolationDetector.new(@event_klass.table_name, @stream_klass.table_name)
end

Instance Method Details

#append_to_stream(records, stream, expected_version) ⇒ Object



66
67
68
69
70
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 66

def append_to_stream(records, stream, expected_version)
  return if records.empty?

  append_to_stream_(records, stream, expected_version)
end

#count(specification) ⇒ Object



94
95
96
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 94

def count(specification)
  @repo_reader.count(specification)
end

#delete_stream(stream) ⇒ Object



78
79
80
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 78

def delete_stream(stream)
  @stream_klass.where(stream: stream.name).delete_all
end

#event_in_stream?(event_id, stream) ⇒ Boolean

Returns:

  • (Boolean)


128
129
130
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 128

def event_in_stream?(event_id, stream)
  @repo_reader.event_in_stream?(event_id, stream)
end

#global_position(event_id) ⇒ Object



124
125
126
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 124

def global_position(event_id)
  @repo_reader.global_position(event_id)
end

#has_event?(event_id) ⇒ Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 82

def has_event?(event_id)
  @repo_reader.has_event?(event_id)
end

#last_stream_event(stream) ⇒ Object



86
87
88
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 86

def last_stream_event(stream)
  @repo_reader.last_stream_event(stream)
end


72
73
74
75
76
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 72

def link_to_stream(event_ids, stream, expected_version)
  return if event_ids.empty?

  link_to_stream_(event_ids, stream, expected_version)
end

#position_in_stream(event_id, stream) ⇒ Object



120
121
122
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 120

def position_in_stream(event_id, stream)
  @repo_reader.position_in_stream(event_id, stream)
end

#read(specification) ⇒ Object



90
91
92
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 90

def read(specification)
  @repo_reader.read(specification)
end

#rescue_from_double_json_serialization!Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 38

def rescue_from_double_json_serialization!
  if serializer == JSON && json_data_type?
    @repo_reader.instance_eval { alias __record__ record }

    @repo_reader.define_singleton_method :unwrap do |column_name, payload|
      if String === payload && payload.start_with?("\{")
        warn "Double serialization of #{column_name} column detected"
        serializer.load(payload)
      else
        payload
      end
    end

    @repo_reader.define_singleton_method :record do |record|
      r = __record__(record)

      Record.new(
        event_id: r.event_id,
        metadata: unwrap("metadata", r.),
        data: unwrap("data", r.data),
        event_type: r.event_type,
        timestamp: r.timestamp,
        valid_at: r.valid_at,
      )
    end
  end
end

#streams_of(event_id) ⇒ Object



116
117
118
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 116

def streams_of(event_id)
  @repo_reader.streams_of(event_id)
end

#update_messages(records) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/ruby_event_store/active_record/event_repository.rb', line 98

def update_messages(records)
  hashes = records.map { |record| upsert_hash(record, record.serialize(serializer)) }
  for_update = records.map(&:event_id)
  start_transaction do
    existing =
      @event_klass
        .where(event_id: for_update)
        .pluck(:event_id, :id, :created_at)
        .reduce({}) { |acc, (event_id, id, created_at)| acc.merge(event_id => [id, created_at]) }
    (for_update - existing.keys).each { |id| raise EventNotFound.new(id) }
    hashes.each do |h|
      h[:id] = existing.fetch(h.fetch(:event_id)).at(0)
      h[:created_at] = existing.fetch(h.fetch(:event_id)).at(1)
    end
    @event_klass.upsert_all(hashes)
  end
end