Module: Redborder::KafkaNotifier

Defined in:
lib/kafka_notifier.rb,
lib/kafka_notifier/base.rb,
lib/kafka_notifier/model.rb,
lib/kafka_notifier/message_queue.rb

Defined Under Namespace

Modules: Base, Model Classes: MessageQueue

Class Method Summary collapse

Class Method Details

.create_producer_threadObject



52
53
54
55
56
57
58
59
# File 'lib/kafka_notifier.rb', line 52

def self.create_producer_thread
  Thread.new do
    loop do
      messages = message_queue.pop_all # Get the max number of msgs or wait
      @@producer.send_messages messages unless messages.empty?
    end
  end
end

.send(topic, key, message) ⇒ Object



46
47
48
49
50
# File 'lib/kafka_notifier.rb', line 46

def self.send(topic, key, message)
  start if message_queue.nil?
  strmsg = JSON.generate(message) if message.is_a? Hash
  message_queue << Poseidon::MessageToSend.new(topic, strmsg, key)
end

.startObject



36
37
38
39
40
41
42
43
44
# File 'lib/kafka_notifier.rb', line 36

def self.start
  self.producer = Poseidon::Producer.new(
    config[:brokers], config[:client_id], type: :sync,
    required_acks: 1, partitioner: partitioner
  )

  self.message_queue = MessageQueue.new(500, 1)
  create_producer_thread
end