Class: Journaled::KinesisBatchSender
- Inherits:
-
Object
- Object
- Journaled::KinesisBatchSender
- Defined in:
- lib/journaled/kinesis_batch_sender.rb
Overview
Sends batches of events to Kinesis using the PutRecords batch API
This class handles:
-
Sending events in batches to improve throughput
-
Handling failures on a per-event basis
-
Classifying errors as transient vs permanent
Returns structured results for the caller to handle event state management.
Constant Summary collapse
- PERMANENT_ERROR_CODES =
Per-record error codes that indicate permanent failures (bad event data)
[ 'ValidationException', ].freeze
Instance Method Summary collapse
-
#send_batch(events) ⇒ Hash
Send a batch of database events to Kinesis.
Instance Method Details
#send_batch(events) ⇒ Hash
Send a batch of database events to Kinesis
Uses put_records batch API. Groups events by stream and sends each group as a batch.
26 27 28 29 30 31 32 33 |
# File 'lib/journaled/kinesis_batch_sender.rb', line 26 def send_batch(events) # Group events by stream since put_records requires all records to go to the same stream events.group_by(&:stream_name).each_with_object({ succeeded: [], failed: [] }) do |(stream_name, stream_events), result| batch_result = send_stream_batch(stream_name, stream_events) result[:succeeded].concat(batch_result[:succeeded]) result[:failed].concat(batch_result[:failed]) end end |