Class: Sourced::Backends::TestBackend::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/sourced/backends/test_backend.rb

Defined Under Namespace

Classes: Offset

Constant Summary collapse

NOOP_FILTER =
->(_) { true }

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(group_id, backend) ⇒ Group

Returns a new instance of Group.



13
14
15
16
17
18
# File 'lib/sourced/backends/test_backend.rb', line 13

def initialize(group_id, backend)
  @group_id = group_id
  @backend = backend
  @offsets = {}
  reindex
end

Instance Attribute Details

#group_idObject (readonly)

Returns the value of attribute group_id.



9
10
11
# File 'lib/sourced/backends/test_backend.rb', line 9

def group_id
  @group_id
end

Instance Method Details

#ack_on(event_id) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/sourced/backends/test_backend.rb', line 35

def ack_on(event_id, &)
  global_seq = backend.events.find_index { |e| e.id == event_id }
  return unless global_seq

  evt = backend.events[global_seq]
  offset = @offsets[evt.stream_id]
  if offset.locked
    raise Sourced::ConcurrentAckError, "Stream for event #{event_id} is being concurrently processed by #{group_id}" unless row
  else
    offset.locked = true
    yield
    offset.index = global_seq
    offset.locked = false
  end
end

#reindexObject



29
30
31
32
33
# File 'lib/sourced/backends/test_backend.rb', line 29

def reindex
  backend.events.each.with_index do |e, idx|
    @offsets[e.stream_id] ||= Offset.new(e.stream_id, -1, false)
  end
end

#reserve_next(handled_events, time_window) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/sourced/backends/test_backend.rb', line 53

def reserve_next(handled_events, time_window, &)
  time_filter = time_window.is_a?(Time) ? ->(e) { e.created_at > time_window } : NOOP_FILTER
  evt = nil
  offset = nil
  index = -1

  backend.events.each.with_index do |e, idx|
    offset = @offsets[e.stream_id]
    if offset.locked # stream locked by another consumer in the group
      next
    elsif idx > offset.index && handled_events.include?(e.class) && time_filter.call(e) # new event for the stream
      evt = e
      offset.locked = true
      index = idx
      break
    else # event already consumed
    end
  end

  if evt
    if block_given?
      yield(evt)
      offset.index = index
    end
    offset.locked = false
  end
  evt
end

#to_hObject



20
21
22
23
24
25
26
27
# File 'lib/sourced/backends/test_backend.rb', line 20

def to_h
  active_offsets = @offsets.values.select { |o| o.index >= 0 }
  oldest_processed = (active_offsets.min_by(&:index)&.index || -1) + 1
  newest_processed = (active_offsets.max_by(&:index)&.index || -1) + 1
  stream_count = active_offsets.size

  { group_id:, oldest_processed:, newest_processed:, stream_count: }
end