Class: WaterDrop::ProducerProxy

Inherits:
Object
  • Object
show all
Defined in:
lib/water_drop/producer_proxy.rb

Overview

Proxy object for a producer (sender) objects that are inside pool We use it to provide additional timeout monitoring layer There used to be an issue with Poseidon (previous engine for this lib) usage of sockets that are old and not used - that’s why we just reinitialize connection if the connection layer is not being used for too long We keep this logic to avoid problems just in case. If those problems won’t occure with Ruby-Kafka, we will drop it

Constant Summary collapse

LIFE_TIME =

How long should be object considered alive if nothing is being send using it. After that time, we will recreate the connection

5 * 60
MAX_SEND_RETRIES =

If sending fails - how many times we should try with a new connection

1

Instance Method Summary collapse

Constructor Details

#initializeWaterDrop::ProducerProxy

Note:

To ignore @last_usage nil case - we just assume that it is being first used when we create it

Returns proxy object to Kafka::Producer.



20
21
22
23
# File 'lib/water_drop/producer_proxy.rb', line 20

def initialize
  touch
  @attempts = 0
end

Instance Method Details

#send_message(message) ⇒ Object

Note:

If something goes wrong it will assume that producer is corrupted and will try to create a new one

Sends message to Kafka

Examples:

Send 1 message

ProducerProxy.new.send_message(WaterDrop::Message.new(topic, message))

Parameters:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/water_drop/producer_proxy.rb', line 31

def send_message(message)
  touch
  producer.produce(message.message, {
    topic: message.topic
  }.merge(message.options))
  producer.deliver_messages
rescue StandardError => e
  reload!

  retry if (@attempts += 1) <= MAX_SEND_RETRIES

  raise(e)
ensure
  @attempts = 0
end