Class: RubyKafkaRetry::RetryFailedEvent
- Inherits:
-
Object
- Object
- RubyKafkaRetry::RetryFailedEvent
- Defined in:
- lib/ruby_kafka_retry.rb
Instance Method Summary collapse
- #can_add_dlq_topic?(curr_retry_attempt) ⇒ Boolean
- #format_message ⇒ Object
-
#initialize(retry_topic, dlq_topic, topic_message, max_retry_attempt = 3) ⇒ RetryFailedEvent
constructor
A new instance of RetryFailedEvent.
- #process_message ⇒ Object
- #retry ⇒ Object
- #validate_params ⇒ Object
Constructor Details
#initialize(retry_topic, dlq_topic, topic_message, max_retry_attempt = 3) ⇒ RetryFailedEvent
Returns a new instance of RetryFailedEvent.
12 13 14 15 16 17 |
# File 'lib/ruby_kafka_retry.rb', line 12 def initialize(retry_topic, dlq_topic, , max_retry_attempt=3) @retry_topic = retry_topic @dlq_topic = dlq_topic @topic_message = @max_retry_attempt = max_retry_attempt end |
Instance Method Details
#can_add_dlq_topic?(curr_retry_attempt) ⇒ Boolean
23 24 25 |
# File 'lib/ruby_kafka_retry.rb', line 23 def can_add_dlq_topic?(curr_retry_attempt) @max_retry_attempt < curr_retry_attempt end |
#format_message ⇒ Object
27 28 29 30 |
# File 'lib/ruby_kafka_retry.rb', line 27 def @topic_message['current_retry_attempt'] = @topic_message['current_retry_attempt'].to_i + 1 @topic_message end |
#process_message ⇒ Object
32 33 34 35 36 37 38 39 |
# File 'lib/ruby_kafka_retry.rb', line 32 def = if(can_add_dlq_topic?(['current_retry_attempt'])) KafkaProducer.new.publish_to_topic(@dlq_topic, ) else SidekiqPublisher.new.publish_to_sidekiq(@retry_topic, ) end end |
#retry ⇒ Object
41 42 43 44 45 |
# File 'lib/ruby_kafka_retry.rb', line 41 def retry validate_params return true end |
#validate_params ⇒ Object
19 20 21 |
# File 'lib/ruby_kafka_retry.rb', line 19 def validate_params RubyKafkaRetryValidator.new(@retry_topic, @dlq_topic, @topic_message).validate end |