Class: Journaled::Delivery

Inherits:
Object
  • Object
show all
Defined in:
app/models/journaled/delivery.rb

Defined Under Namespace

Classes: KinesisTemporaryFailure

Constant Summary collapse

DEFAULT_REGION =
'us-east-1'.freeze

Instance Method Summary collapse

Constructor Details

#initialize(serialized_event:, partition_key:, app_name:) ⇒ Delivery

Returns a new instance of Delivery.



4
5
6
7
8
# File 'app/models/journaled/delivery.rb', line 4

def initialize(serialized_event:, partition_key:, app_name:)
  @serialized_event = serialized_event
  @partition_key = partition_key
  @app_name = app_name
end

Instance Method Details

#kinesis_client_configObject



25
26
27
28
29
30
# File 'app/models/journaled/delivery.rb', line 25

def kinesis_client_config
  {
    region: ENV.fetch('AWS_DEFAULT_REGION', DEFAULT_REGION),
    retry_limit: 0,
  }.merge(credentials)
end

#performObject



10
11
12
13
14
15
16
17
18
# File 'app/models/journaled/delivery.rb', line 10

def perform
  kinesis_client.put_record record if Journaled.enabled?
rescue Aws::Kinesis::Errors::InternalFailure, Aws::Kinesis::Errors::ServiceUnavailable, Aws::Kinesis::Errors::Http503Error => e
  Rails.logger.error "Kinesis Error - Server Error occurred - #{e.class}"
  raise KinesisTemporaryFailure
rescue Seahorse::Client::NetworkingError => e
  Rails.logger.error "Kinesis Error - Networking Error occurred - #{e.class}"
  raise KinesisTemporaryFailure
end

#stream_nameObject



20
21
22
23
# File 'app/models/journaled/delivery.rb', line 20

def stream_name
  env_var_name = [app_name&.upcase, 'JOURNALED_STREAM_NAME'].compact.join('_')
  ENV.fetch(env_var_name)
end