Class: Journaled::Outbox::BatchProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/journaled/outbox/batch_processor.rb

Overview

Processes batches of outbox events

This class handles the core business logic of:

  • Fetching events from the database (with FOR UPDATE)

  • Sending them to Kinesis (batch API or sequential)

  • Handling successful deliveries (deleting events)

  • Handling permanent failures (marking with failed_at)

  • Handling transient failures (leaving unlocked for retry)

Supports two modes based on Journaled.outbox_processing_mode:

  • :batch - Uses put_records API for high throughput with parallel workers

  • :guaranteed_order - Uses put_record API for sequential processing

All operations happen within a single database transaction for consistency. The Worker class delegates to this for actual event processing.

Instance Method Summary collapse

Constructor Details

#initializeBatchProcessor

Returns a new instance of BatchProcessor.



21
22
23
24
25
26
27
# File 'lib/journaled/outbox/batch_processor.rb', line 21

def initialize
  @batch_sender = if Journaled.outbox_processing_mode == :guaranteed_order
    KinesisSequentialSender.new
  else
    KinesisBatchSender.new
  end
end

Instance Method Details

#process_batchHash

Process a single batch of events

Wraps the entire batch processing in a single transaction:

  1. SELECT FOR UPDATE (claim events)

  2. Send to Kinesis (batch API or sequential, based on mode)

  3. Delete successful events

  4. Mark permanently failed events

  5. Leave transient failures untouched (will be retried)

Returns:

  • (Hash)

    Statistics with :succeeded, :failed_permanently, :failed_transiently counts



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/journaled/outbox/batch_processor.rb', line 39

def process_batch
  ActiveRecord::Base.transaction do
    events = Event.fetch_batch_for_update
    Rails.logger.info("[journaled] Processing batch of #{events.count} events")

    result = batch_sender.send_batch(events)

    Event.where(id: result[:succeeded].map(&:id)).delete_all if result[:succeeded].any?

    permanent_failures = result[:failed].select(&:permanent?)
    transient_failures = result[:failed].select(&:transient?)

    mark_events_as_failed(permanent_failures) if permanent_failures.any?

    Rails.logger.info(
      "[journaled] Batch complete: #{result[:succeeded].count} succeeded, " \
      "#{permanent_failures.count} permanently failed, " \
      "#{transient_failures.count} transiently failed (will retry) " \
      "(batch size: #{events.count})",
    )

    {
      succeeded: result[:succeeded].count,
      failed_permanently: permanent_failures.count,
      failed_transiently: transient_failures.count,
    }
  end
end