Class: RubyEventStore::Outbox::Consumer
- Inherits:
-
Object
- Object
- RubyEventStore::Outbox::Consumer
- Defined in:
- lib/ruby_event_store/outbox/consumer.rb
Constant Summary collapse
- MAXIMUM_BATCH_FETCHES_IN_ONE_LOCK =
10
Instance Method Summary collapse
- #handle_split(fetch_specification) ⇒ Object
-
#initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:) ⇒ Consumer
constructor
A new instance of Consumer.
- #process ⇒ Object
Constructor Details
#initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:) ⇒ Consumer
Returns a new instance of Consumer.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/ruby_event_store/outbox/consumer.rb', line 19 def initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:) @split_keys = configuration.split_keys @clock = clock @logger = logger @metrics = metrics @tempo = Tempo.new(configuration.batch_size) @consumer_uuid = consumer_uuid @locking = configuration.locking raise "Unknown format" if configuration. != SIDEKIQ5_FORMAT redis_config = RedisClient.config(url: configuration.redis_url) @processor = SidekiqProcessor.new(redis_config.new_client) @repository = Repository.new(configuration.database_url, logger, metrics) @cleanup_strategy = CleanupStrategies.build(configuration, repository) end |
Instance Method Details
#handle_split(fetch_specification) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/ruby_event_store/outbox/consumer.rb', line 46 def handle_split(fetch_specification) repository .with_next_batch(fetch_specification, tempo.batch_size, consumer_uuid, locking, @clock) do |record| now = @clock.now.utc processor.process(record, now) repository.mark_as_enqueued(record, now) end .tap do cleanup(fetch_specification) processor.after_batch end .success_count > 0 end |
#process ⇒ Object
36 37 38 39 40 41 42 43 44 |
# File 'lib/ruby_event_store/outbox/consumer.rb', line 36 def process remaining_split_keys = split_keys.dup was_something_changed = false while (split_key = remaining_split_keys.shift) was_something_changed |= handle_split(FetchSpecification.new(processor., split_key)) end was_something_changed end |