Class: WaterDrop::Message
- Inherits:
-
Object
- Object
- WaterDrop::Message
- Defined in:
- lib/water_drop/message.rb
Overview
Message class which encapsulate single Kafka message logic and its delivery
Instance Attribute Summary collapse
-
#message ⇒ Object
readonly
Returns the value of attribute message.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#initialize(topic, message, options = {}) ⇒ WaterDrop::Message
constructor
WaterDrop message instance.
-
#send! ⇒ Object
Sents a current message to Kafka.
Constructor Details
#initialize(topic, message, options = {}) ⇒ WaterDrop::Message
Returns WaterDrop message instance.
13 14 15 16 17 |
# File 'lib/water_drop/message.rb', line 13 def initialize(topic, , = {}) @topic = topic.to_s @message = @options = end |
Instance Attribute Details
#message ⇒ Object (readonly)
Returns the value of attribute message.
4 5 6 |
# File 'lib/water_drop/message.rb', line 4 def @message end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
4 5 6 |
# File 'lib/water_drop/message.rb', line 4 def @options end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
4 5 6 |
# File 'lib/water_drop/message.rb', line 4 def topic @topic end |
Instance Method Details
#send! ⇒ Object
Note:
Won’t send any messages if send_messages config flag is set to false
Sents a current message to Kafka
23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/water_drop/message.rb', line 23 def send! return true unless ::WaterDrop.config. Pool.with { |producer| producer.(self) } ::WaterDrop.logger.info("Message #{} was sent to topic '#{topic}'") rescue StandardError => e # Even if we dont reraise this exception, it should log that it happened ::WaterDrop.logger.error(e) # Reraise if we want to raise on failure # Ignore if we dont want to know that something went wrong return unless ::WaterDrop.config.raise_on_failure raise(e) end |