Class: Sourced::Backends::TestBackend::Group
- Inherits:
-
Object
- Object
- Sourced::Backends::TestBackend::Group
- Defined in:
- lib/sourced/backends/test_backend.rb
Defined Under Namespace
Classes: Offset
Constant Summary collapse
- NOOP_FILTER =
->(_) { true }
Instance Attribute Summary collapse
-
#group_id ⇒ Object
readonly
Returns the value of attribute group_id.
Instance Method Summary collapse
- #ack_on(event_id) ⇒ Object
-
#initialize(group_id, backend) ⇒ Group
constructor
A new instance of Group.
- #reindex ⇒ Object
- #reserve_next(handled_events, time_window) ⇒ Object
- #to_h ⇒ Object
Constructor Details
#initialize(group_id, backend) ⇒ Group
Returns a new instance of Group.
13 14 15 16 17 18 |
# File 'lib/sourced/backends/test_backend.rb', line 13 def initialize(group_id, backend) @group_id = group_id @backend = backend @offsets = {} reindex end |
Instance Attribute Details
#group_id ⇒ Object (readonly)
Returns the value of attribute group_id.
9 10 11 |
# File 'lib/sourced/backends/test_backend.rb', line 9 def group_id @group_id end |
Instance Method Details
#ack_on(event_id) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/sourced/backends/test_backend.rb', line 35 def ack_on(event_id, &) global_seq = backend.events.find_index { |e| e.id == event_id } return unless global_seq evt = backend.events[global_seq] offset = @offsets[evt.stream_id] if offset.locked raise Sourced::ConcurrentAckError, "Stream for event #{event_id} is being concurrently processed by #{group_id}" unless row else offset.locked = true yield offset.index = global_seq offset.locked = false end end |
#reindex ⇒ Object
29 30 31 32 33 |
# File 'lib/sourced/backends/test_backend.rb', line 29 def reindex backend.events.each.with_index do |e, idx| @offsets[e.stream_id] ||= Offset.new(e.stream_id, -1, false) end end |
#reserve_next(handled_events, time_window) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/sourced/backends/test_backend.rb', line 53 def reserve_next(handled_events, time_window, &) time_filter = time_window.is_a?(Time) ? ->(e) { e.created_at > time_window } : NOOP_FILTER evt = nil offset = nil index = -1 backend.events.each.with_index do |e, idx| offset = @offsets[e.stream_id] if offset.locked # stream locked by another consumer in the group next elsif idx > offset.index && handled_events.include?(e.class) && time_filter.call(e) # new event for the stream evt = e offset.locked = true index = idx break else # event already consumed end end if evt if block_given? yield(evt) offset.index = index end offset.locked = false end evt end |
#to_h ⇒ Object
20 21 22 23 24 25 26 27 |
# File 'lib/sourced/backends/test_backend.rb', line 20 def to_h active_offsets = @offsets.values.select { |o| o.index >= 0 } oldest_processed = (active_offsets.min_by(&:index)&.index || -1) + 1 newest_processed = (active_offsets.max_by(&:index)&.index || -1) + 1 stream_count = active_offsets.size { group_id:, oldest_processed:, newest_processed:, stream_count: } end |