Class: Phobos::Producer::ClassMethods::PublicAPI

Inherits:
Object
  • Object
show all
Defined in:
lib/phobos/producer.rb

Constant Summary collapse

NAMESPACE =
:phobos_producer_store
ASYNC_PRODUCER_PARAMS =
%i(max_queue_size delivery_threshold delivery_interval).freeze

Instance Method Summary collapse

Instance Method Details

#async_configsObject



111
112
113
# File 'lib/phobos/producer.rb', line 111

def async_configs
  Phobos.config.producer_hash
end

#async_producerObject



87
88
89
# File 'lib/phobos/producer.rb', line 87

def async_producer
  producer_store[:async_producer]
end

#async_producer_shutdownObject



101
102
103
104
105
# File 'lib/phobos/producer.rb', line 101

def async_producer_shutdown
  async_producer&.deliver_messages
  async_producer&.shutdown
  producer_store[:async_producer] = nil
end

#async_publish(topic, payload, key = nil) ⇒ Object



91
92
93
# File 'lib/phobos/producer.rb', line 91

def async_publish(topic, payload, key = nil)
  async_publish_list([{ topic: topic, payload: payload, key: key }])
end

#async_publish_list(messages) ⇒ Object



95
96
97
98
99
# File 'lib/phobos/producer.rb', line 95

def async_publish_list(messages)
  producer = async_producer || create_async_producer
  produce_messages(producer, messages)
  producer.deliver_messages unless async_automatic_delivery?
end

#configure_kafka_client(kafka_client) ⇒ Object

This method configures the kafka client used with publish operations performed by the host class

Parameters:

  • kafka_client (Kafka::Client)


59
60
61
62
# File 'lib/phobos/producer.rb', line 59

def configure_kafka_client(kafka_client)
  async_producer_shutdown
  producer_store[:kafka_client] = kafka_client
end

#create_async_producerObject



81
82
83
84
85
# File 'lib/phobos/producer.rb', line 81

def create_async_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
  async_producer = client.async_producer(async_configs)
  producer_store[:async_producer] = async_producer
end

#kafka_clientObject



64
65
66
# File 'lib/phobos/producer.rb', line 64

def kafka_client
  producer_store[:kafka_client]
end

#publish(topic, payload, key = nil) ⇒ Object



68
69
70
# File 'lib/phobos/producer.rb', line 68

def publish(topic, payload, key = nil)
  publish_list([{ topic: topic, payload: payload, key: key }])
end

#publish_list(messages) ⇒ Object



72
73
74
75
76
77
78
79
# File 'lib/phobos/producer.rb', line 72

def publish_list(messages)
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
  producer = client.producer(regular_configs)
  produce_messages(producer, messages)
  producer.deliver_messages
ensure
  producer&.shutdown
end

#regular_configsObject



107
108
109
# File 'lib/phobos/producer.rb', line 107

def regular_configs
  Phobos.config.producer_hash.reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k)}
end