Class: RubyKafkaRetry::RetryFailedEvent

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_kafka_retry.rb

Instance Method Summary collapse

Constructor Details

#initialize(retry_topic, dlq_topic, topic_message, max_retry_attempt = 3) ⇒ RetryFailedEvent

Returns a new instance of RetryFailedEvent.



12
13
14
15
16
17
# File 'lib/ruby_kafka_retry.rb', line 12

def initialize(retry_topic, dlq_topic, topic_message, max_retry_attempt=3)
  @retry_topic = retry_topic
  @dlq_topic = dlq_topic
  @topic_message = topic_message
  @max_retry_attempt = max_retry_attempt
end

Instance Method Details

#can_add_dlq_topic?(curr_retry_attempt) ⇒ Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/ruby_kafka_retry.rb', line 23

def can_add_dlq_topic?(curr_retry_attempt)
  @max_retry_attempt < curr_retry_attempt
end

#format_messageObject



27
28
29
30
# File 'lib/ruby_kafka_retry.rb', line 27

def format_message
  @topic_message['current_retry_attempt'] = @topic_message['current_retry_attempt'].to_i + 1
  @topic_message
end

#process_messageObject



32
33
34
35
36
37
38
39
# File 'lib/ruby_kafka_retry.rb', line 32

def process_message
  message = format_message
  if(can_add_dlq_topic?(message['current_retry_attempt']))
    KafkaProducer.new.publish_to_topic(@dlq_topic, message)
  else
    SidekiqPublisher.new.publish_to_sidekiq(@retry_topic, message)
  end
end

#retryObject



41
42
43
44
45
# File 'lib/ruby_kafka_retry.rb', line 41

def retry
  validate_params
  process_message
  return true
end

#validate_paramsObject



19
20
21
# File 'lib/ruby_kafka_retry.rb', line 19

def validate_params
  RubyKafkaRetryValidator.new(@retry_topic, @dlq_topic, @topic_message).validate
end