Class: RubyEventStore::Sequel::EventRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/sequel/event_repository.rb

Constant Summary collapse

UPSERT_COLUMNS =
%i[event_type data metadata valid_at].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sequel:, serializer:) ⇒ EventRepository

Returns a new instance of EventRepository.



8
9
10
11
12
13
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 8

def initialize(sequel:, serializer:)
  @serializer = serializer
  @index_violation_detector = IndexViolationDetector.new("event_store_events", "event_store_events_in_streams")
  @db = sequel
  @db.timezone = :utc
end

Instance Attribute Details

#index_violation_detectorObject (readonly)

Returns the value of attribute index_violation_detector.



15
16
17
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 15

def index_violation_detector
  @index_violation_detector
end

Instance Method Details

#append_to_stream(records, stream, expected_version) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 17

def append_to_stream(records, stream, expected_version)
  resolved_version = resolved_version(expected_version, stream)

  @db.transaction do
    records.map.with_index do |record, index|
      serialized_record = record.serialize(@serializer)

      @db[:event_store_events].insert(
        event_id: serialized_record.event_id,
        event_type: serialized_record.event_type,
        data: serialized_record.data,
        metadata: serialized_record.,
        created_at: record.timestamp,
        valid_at: optimize_timestamp(record.valid_at, record.timestamp),
      )
      unless stream.global?
        @db[:event_store_events_in_streams].insert(
          event_id: serialized_record.event_id,
          stream: stream.name,
          created_at: Time.now.utc,
          position: resolved_version ? resolved_version + index + 1 : nil,
        )
      end
    end
  end
  self
rescue ::Sequel::UniqueConstraintViolation => ex
  raise EventDuplicatedInStream if index_violation_detector.detect(ex.message)
  raise WrongExpectedEventVersion
end

#count(specification) ⇒ Object



140
141
142
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 140

def count(specification)
  read_(specification).count
end

#delete_stream(stream) ⇒ Object



102
103
104
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 102

def delete_stream(stream)
  @db[:event_store_events_in_streams].where(stream: stream.name).delete
end

#event_in_stream?(event_id, stream) ⇒ Boolean

Returns:

  • (Boolean)


98
99
100
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 98

def event_in_stream?(event_id, stream)
  @db[:event_store_events_in_streams].where(event_id: event_id, stream: stream.name).any?
end

#global_position(event_id) ⇒ Object

Raises:

  • (EventNotFound)


88
89
90
91
92
93
94
95
96
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 88

def global_position(event_id)
  record =
    @db[:event_store_events]
      .select(::Sequel[:event_store_events][:id])
      .where(::Sequel[:event_store_events][:event_id] => event_id)
      .first
  raise EventNotFound.new(event_id) if record.nil?
  record[:id] - 1
end

#has_event?(event_id) ⇒ Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 106

def has_event?(event_id)
  @db[:event_store_events].where(event_id: event_id).any?
end

#last_stream_event(stream) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 110

def last_stream_event(stream)
  row = @db[:event_store_events_in_streams].where(stream: stream.name).order(:position, :id).last
  return row if row.nil?
  event = @db[:event_store_events].where(event_id: row[:event_id]).first
  SerializedRecord.new(
    event_id: event[:event_id],
    event_type: event[:event_type],
    data: event[:data],
    metadata: event[:metadata],
    timestamp: event[:created_at].iso8601(TIMESTAMP_PRECISION),
    valid_at: (event[:valid_at] || event[:created_at]).iso8601(TIMESTAMP_PRECISION),
  ).deserialize(@serializer)
end


48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 48

def link_to_stream(event_ids, stream, expected_version)
  (
    event_ids -
      @db[:event_store_events]
        .select(::Sequel[:event_store_events][:event_id])
        .where(::Sequel[:event_store_events][:event_id] => event_ids)
        .map { |e| e[:event_id] }
  ).each { |id| raise EventNotFound.new(id) }

  resolved_version = resolved_version(expected_version, stream)

  @db.transaction do
    event_ids.map.with_index do |event_id, index|
      @db[:event_store_events_in_streams].insert(
        event_id: event_id,
        stream: stream.name,
        created_at: Time.now.utc,
        position: resolved_version ? resolved_version + index + 1 : nil,
      )
    end
  end
  self
rescue ::Sequel::UniqueConstraintViolation => ex
  raise EventDuplicatedInStream if index_violation_detector.detect(ex.message)
  raise WrongExpectedEventVersion
end

#position_in_stream(event_id, stream) ⇒ Object

Raises:

  • (EventNotFoundInStream)


75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 75

def position_in_stream(event_id, stream)
  record =
    @db[:event_store_events_in_streams]
      .select(::Sequel[:event_store_events_in_streams][:position])
      .where(
        ::Sequel[:event_store_events_in_streams][:event_id] => event_id,
        ::Sequel[:event_store_events_in_streams][:stream] => stream.name,
      )
      .first
  raise EventNotFoundInStream.new if record.nil?
  record[:position]
end

#read(specification) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 124

def read(specification)
  if specification.batched?
    stream = read_(specification)
    batch_reader = ->(offset, limit) { stream.offset(offset).limit(limit).map(&method(:record)) }
    RubyEventStore::BatchEnumerator.new(specification.batch_size, specification.limit, batch_reader).each
  elsif specification.first?
    record_ = read_(specification).first
    record(record_) if record_
  elsif specification.last?
    record_ = read_(specification).last
    record(record_) if record_
  else
    read_(specification).map { |h| record(h) }.each
  end
end

#streams_of(event_id) ⇒ Object



171
172
173
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 171

def streams_of(event_id)
  @db[:event_store_events_in_streams].where(event_id: event_id).map { |h| Stream.new(h[:stream]) }
end

#update_messages(records) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/ruby_event_store/sequel/event_repository.rb', line 144

def update_messages(records)
  hashes = records.map { |record| upsert_hash(record, record.serialize(@serializer)) }
  for_update = records.map(&:event_id)
  @db.transaction do
    existing =
      @db[:event_store_events]
        .where(event_id: for_update)
        .select(:event_id, :id, :created_at, :valid_at)
        .reduce({}) do |acc, record|
          acc.merge(record[:event_id] => [record[:id], record[:created_at], record[:valid_at]])
        end

    (for_update - existing.keys).each { |id| raise EventNotFound.new(id) }
    hashes.each do |h|
      h[:id] = existing.fetch(h.fetch(:event_id)).at(0)
      h[:created_at] = existing.fetch(h.fetch(:event_id)).at(1)
      h[:valid_at] = existing.fetch(h.fetch(:event_id)).at(2)
    end

    if supports_on_duplicate_key_update?
      commit_on_duplicate_key_update(hashes)
    else
      commit_insert_conflict_update(hashes)
    end
  end
end