Class: WaterDrop::ProducerProxy
- Inherits:
-
Object
- Object
- WaterDrop::ProducerProxy
- Defined in:
- lib/water_drop/producer_proxy.rb
Overview
Proxy object for a producer (sender) objects that are inside pool We use it to provide additional timeout monitoring layer There used to be an issue with Poseidon (previous engine for this lib) usage of sockets that are old and not used - that’s why we just reinitialize connection if the connection layer is not being used for too long We keep this logic to avoid problems just in case. If those problems won’t occure with Ruby-Kafka, we will drop it
Constant Summary collapse
- LIFE_TIME =
How long should be object considered alive if nothing is being send using it. After that time, we will recreate the connection
5 * 60
- MAX_SEND_RETRIES =
If sending fails - how many times we should try with a new connection
1
Instance Method Summary collapse
-
#initialize ⇒ WaterDrop::ProducerProxy
constructor
Proxy object to Kafka::Producer.
-
#send_message(message) ⇒ Object
Sends message to Kafka.
Constructor Details
#initialize ⇒ WaterDrop::ProducerProxy
To ignore @last_usage nil case - we just assume that it is being first used when we create it
Returns proxy object to Kafka::Producer.
20 21 22 23 |
# File 'lib/water_drop/producer_proxy.rb', line 20 def initialize touch @attempts = 0 end |
Instance Method Details
#send_message(message) ⇒ Object
If something goes wrong it will assume that producer is corrupted and will try to create a new one
Sends message to Kafka
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/water_drop/producer_proxy.rb', line 31 def () touch producer.produce(., { topic: .topic }.merge(.)) producer. rescue StandardError => e reload! retry if (@attempts += 1) <= MAX_SEND_RETRIES raise(e) ensure @attempts = 0 end |