Class: Phobos::Producer::ClassMethods::PublicAPI

Inherits:
Object
  • Object
show all
Defined in:
lib/phobos/producer.rb

Constant Summary collapse

NAMESPACE =
:phobos_producer_store

Instance Method Summary collapse

Instance Method Details

#async_producerObject



85
86
87
# File 'lib/phobos/producer.rb', line 85

def async_producer
  producer_store[:async_producer]
end

#async_producer_shutdownObject



98
99
100
101
102
# File 'lib/phobos/producer.rb', line 98

def async_producer_shutdown
  async_producer&.deliver_messages
  async_producer&.shutdown
  producer_store[:async_producer] = nil
end

#async_publish(topic, payload, key = nil) ⇒ Object



89
90
91
# File 'lib/phobos/producer.rb', line 89

def async_publish(topic, payload, key = nil)
  async_publish_list([{ topic: topic, payload: payload, key: key }])
end

#async_publish_list(messages) ⇒ Object



93
94
95
96
# File 'lib/phobos/producer.rb', line 93

def async_publish_list(messages)
  producer = async_producer || create_async_producer
  produce_messages(producer, messages)
end

#configure_kafka_client(kafka_client) ⇒ Object

This method configures the kafka client used with publish operations performed by the host class

Parameters:

  • kafka_client (Kafka::Client)


58
59
60
61
# File 'lib/phobos/producer.rb', line 58

def configure_kafka_client(kafka_client)
  async_producer_shutdown
  producer_store[:kafka_client] = kafka_client
end

#create_async_producerObject



79
80
81
82
83
# File 'lib/phobos/producer.rb', line 79

def create_async_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
  async_producer = client.async_producer(Phobos.config.producer_hash)
  producer_store[:async_producer] = async_producer
end

#kafka_clientObject



63
64
65
# File 'lib/phobos/producer.rb', line 63

def kafka_client
  producer_store[:kafka_client]
end

#publish(topic, payload, key = nil) ⇒ Object



67
68
69
# File 'lib/phobos/producer.rb', line 67

def publish(topic, payload, key = nil)
  publish_list([{ topic: topic, payload: payload, key: key }])
end

#publish_list(messages) ⇒ Object



71
72
73
74
75
76
77
# File 'lib/phobos/producer.rb', line 71

def publish_list(messages)
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
  producer = client.producer(Phobos.config.producer_hash)
  produce_messages(producer, messages)
ensure
  producer&.shutdown
end