Class: Journaled::KinesisBatchSender

Inherits:
Object
  • Object
show all
Defined in:
lib/journaled/kinesis_batch_sender.rb

Overview

Sends batches of events to Kinesis using the PutRecords batch API

This class handles:

  • Sending events in batches to improve throughput

  • Handling failures on a per-event basis

  • Classifying errors as transient vs permanent

Returns structured results for the caller to handle event state management.

Constant Summary collapse

PERMANENT_ERROR_CODES =

Per-record error codes that indicate permanent failures (bad event data)

[
  'ValidationException',
].freeze

Instance Method Summary collapse

Instance Method Details

#send_batch(events) ⇒ Hash

Send a batch of database events to Kinesis

Uses put_records batch API. Groups events by stream and sends each group as a batch.

Parameters:

Returns:

  • (Hash)

    Result with:

    • succeeded: Array of successfully sent events

    • failed: Array of FailedEvent structs (both transient and permanent failures)



26
27
28
29
30
31
32
33
# File 'lib/journaled/kinesis_batch_sender.rb', line 26

def send_batch(events)
  # Group events by stream since put_records requires all records to go to the same stream
  events.group_by(&:stream_name).each_with_object({ succeeded: [], failed: [] }) do |(stream_name, stream_events), result|
    batch_result = send_stream_batch(stream_name, stream_events)
    result[:succeeded].concat(batch_result[:succeeded])
    result[:failed].concat(batch_result[:failed])
  end
end