Class: RubyEventStore::ROM::Relations::StreamEntries

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/rom/relations/stream_entries.rb

Constant Summary collapse

DIRECTION_MAP =
{ forward: %i[asc > <], backward: %i[desc < >] }.freeze

Instance Method Summary collapse

Instance Method Details

#by_event_id(event_id) ⇒ Object



21
22
23
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 21

def by_event_id(event_id)
  where(event_id: event_id)
end

#by_event_type(types) ⇒ Object



25
26
27
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 25

def by_event_type(types)
  join_events.where(event_type: types)
end

#by_stream(stream) ⇒ Object



17
18
19
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 17

def by_stream(stream)
  where(stream: stream.name)
end

#by_stream_and_event_id(stream, event_id) ⇒ Object



29
30
31
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 29

def by_stream_and_event_id(stream, event_id)
  where(stream: stream.name, event_id: event_id).one!
end

#create_changeset(tuples) ⇒ Object



13
14
15
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 13

def create_changeset(tuples)
  changeset(ROM::Changesets::CreateStreamEntries, tuples)
end

#join_eventsObject



97
98
99
100
101
102
103
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 97

def join_events
  if dataset.opts[:join]&.map(&:table)&.include?(events.dataset.first_source_table)
    self
  else
    join(:events)
  end
end

#max_position(stream) ⇒ Object



33
34
35
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 33

def max_position(stream)
  by_stream(stream).select(:position).order(Sequel.desc(:position)).first
end

#newer_than(time, time_sort_by) ⇒ Object



37
38
39
40
41
42
43
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 37

def newer_than(time, time_sort_by)
  if time_sort_by == :as_of
    join_events.where { |r| string::coalesce(r.events[:valid_at], r.events[:created_at]) > time.localtime }
  else
    join_events.where { |r| r.events[:created_at] > time.localtime }
  end
end

#newer_than_or_equal(time, time_sort_by) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 45

def newer_than_or_equal(time, time_sort_by)
  if time_sort_by == :as_of
    join_events.where { |r| string::coalesce(r.events[:valid_at], r.events[:created_at]) >= time.localtime }
  else
    join_events.where { |r| r.events[:created_at] >= time.localtime }
  end
end

#older_than(time, time_sort_by) ⇒ Object



53
54
55
56
57
58
59
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 53

def older_than(time, time_sort_by)
  if time_sort_by == :as_of
    join_events.where { |r| string::coalesce(r.events[:valid_at], r.events[:created_at]) < time.localtime }
  else
    join_events.where { |r| r.events[:created_at] < time.localtime }
  end
end

#older_than_or_equal(time, time_sort_by) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 61

def older_than_or_equal(time, time_sort_by)
  if time_sort_by == :as_of
    join_events.where { |r| string::coalesce(r.events[:valid_at], r.events[:created_at]) <= time.localtime }
  else
    join_events.where { |r| r.events[:created_at] <= time.localtime }
  end
end

#ordered(direction, stream, offset_entry_id = nil, stop_entry_id = nil, time_sort_by = nil) ⇒ Object

Raises:

  • (ArgumentError)


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 71

def ordered(direction, stream, offset_entry_id = nil, stop_entry_id = nil, time_sort_by = nil)
  order, operator_offset, operator_stop = DIRECTION_MAP[direction]

  raise ArgumentError, "Direction must be :forward or :backward" if order.nil?

  event_order_columns = []
  stream_order_columns = %i[id]

  case time_sort_by
  when :as_at
    event_order_columns.unshift :created_at
  when :as_of
    event_order_columns.unshift :valid_at
  end

  query = by_stream(stream)
  query = query.where { id.public_send(operator_offset, offset_entry_id) } if offset_entry_id
  query = query.where { id.public_send(operator_stop, stop_entry_id) } if stop_entry_id

  if event_order_columns.empty?
    query.order { |r| stream_order_columns.map { |c| r[:stream_entries][c].public_send(order) } }
  else
    query.join_events.order { |r| event_order_columns.map { |c| r.events[c].public_send(order) } }
  end
end