Module: Redborder::KafkaNotifier
- Defined in:
- lib/kafka_notifier.rb,
lib/kafka_notifier/base.rb,
lib/kafka_notifier/model.rb,
lib/kafka_notifier/message_queue.rb
Defined Under Namespace
Modules: Base, Model
Classes: MessageQueue
Class Method Summary
collapse
Class Method Details
.create_producer_thread ⇒ Object
52
53
54
55
56
57
58
59
|
# File 'lib/kafka_notifier.rb', line 52
def self.create_producer_thread
Thread.new do
loop do
messages = message_queue.pop_all @@producer.send_messages messages unless messages.empty?
end
end
end
|
.send(topic, key, message) ⇒ Object
46
47
48
49
50
|
# File 'lib/kafka_notifier.rb', line 46
def self.send(topic, key, message)
start if message_queue.nil?
strmsg = JSON.generate(message) if message.is_a? Hash
message_queue << Poseidon::MessageToSend.new(topic, strmsg, key)
end
|
.start ⇒ Object
36
37
38
39
40
41
42
43
44
|
# File 'lib/kafka_notifier.rb', line 36
def self.start
self.producer = Poseidon::Producer.new(
config[:brokers], config[:client_id], type: :sync,
required_acks: 1, partitioner: partitioner
)
self.message_queue = MessageQueue.new(500, 1)
create_producer_thread
end
|