Class: Journaled::Outbox::BatchProcessor
- Inherits:
-
Object
- Object
- Journaled::Outbox::BatchProcessor
- Defined in:
- lib/journaled/outbox/batch_processor.rb
Overview
Processes batches of outbox events
This class handles the core business logic of:
-
Fetching events from the database (with FOR UPDATE)
-
Sending them to Kinesis (batch API or sequential)
-
Handling successful deliveries (deleting events)
-
Handling permanent failures (marking with failed_at)
-
Handling transient failures (leaving unlocked for retry)
Supports two modes based on Journaled.outbox_processing_mode:
-
:batch - Uses put_records API for high throughput with parallel workers
-
:guaranteed_order - Uses put_record API for sequential processing
All operations happen within a single database transaction for consistency. The Worker class delegates to this for actual event processing.
Instance Method Summary collapse
-
#initialize ⇒ BatchProcessor
constructor
A new instance of BatchProcessor.
-
#process_batch ⇒ Hash
Process a single batch of events.
Constructor Details
#initialize ⇒ BatchProcessor
Returns a new instance of BatchProcessor.
21 22 23 24 25 26 27 |
# File 'lib/journaled/outbox/batch_processor.rb', line 21 def initialize @batch_sender = if Journaled.outbox_processing_mode == :guaranteed_order KinesisSequentialSender.new else KinesisBatchSender.new end end |
Instance Method Details
#process_batch ⇒ Hash
Process a single batch of events
Wraps the entire batch processing in a single transaction:
-
SELECT FOR UPDATE (claim events)
-
Send to Kinesis (batch API or sequential, based on mode)
-
Delete successful events
-
Mark permanently failed events
-
Leave transient failures untouched (will be retried)
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/journaled/outbox/batch_processor.rb', line 39 def process_batch ActiveRecord::Base.transaction do events = Event.fetch_batch_for_update Rails.logger.info("[journaled] Processing batch of #{events.count} events") result = batch_sender.send_batch(events) Event.where(id: result[:succeeded].map(&:id)).delete_all if result[:succeeded].any? permanent_failures = result[:failed].select(&:permanent?) transient_failures = result[:failed].select(&:transient?) mark_events_as_failed(permanent_failures) if permanent_failures.any? Rails.logger.info( "[journaled] Batch complete: #{result[:succeeded].count} succeeded, " \ "#{permanent_failures.count} permanently failed, " \ "#{transient_failures.count} transiently failed (will retry) " \ "(batch size: #{events.count})", ) { succeeded: result[:succeeded].count, failed_permanently: permanent_failures.count, failed_transiently: transient_failures.count, } end end |