Class: RubyEventStore::Outbox::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/outbox/consumer.rb

Constant Summary collapse

MAXIMUM_BATCH_FETCHES_IN_ONE_LOCK =
10

Instance Method Summary collapse

Constructor Details

#initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:) ⇒ Consumer

Returns a new instance of Consumer.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/ruby_event_store/outbox/consumer.rb', line 19

def initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:)
  @split_keys = configuration.split_keys
  @clock = clock
  @logger = logger
  @metrics = metrics
  @tempo = Tempo.new(configuration.batch_size)
  @consumer_uuid = consumer_uuid
  @locking = configuration.locking

  raise "Unknown format" if configuration.message_format != SIDEKIQ5_FORMAT
  redis_config = RedisClient.config(url: configuration.redis_url)
  @processor = SidekiqProcessor.new(redis_config.new_client)

  @repository = Repository.new(configuration.database_url, logger, metrics)
  @cleanup_strategy = CleanupStrategies.build(configuration, repository)
end

Instance Method Details

#handle_split(fetch_specification) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/ruby_event_store/outbox/consumer.rb', line 46

def handle_split(fetch_specification)
  repository
    .with_next_batch(fetch_specification, tempo.batch_size, consumer_uuid, locking, @clock) do |record|
      now = @clock.now.utc
      processor.process(record, now)
      repository.mark_as_enqueued(record, now)
    end
    .tap do
      cleanup(fetch_specification)
      processor.after_batch
    end
    .success_count > 0
end

#processObject



36
37
38
39
40
41
42
43
44
# File 'lib/ruby_event_store/outbox/consumer.rb', line 36

def process
  remaining_split_keys = split_keys.dup

  was_something_changed = false
  while (split_key = remaining_split_keys.shift)
    was_something_changed |= handle_split(FetchSpecification.new(processor.message_format, split_key))
  end
  was_something_changed
end