Class: Phobos::Producer::ClassMethods::PublicAPI

Inherits:
Object
  • Object
show all
Defined in:
lib/phobos/producer.rb

Constant Summary collapse

NAMESPACE =
:phobos_producer_store
ASYNC_PRODUCER_PARAMS =
[:max_queue_size, :delivery_threshold, :delivery_interval].freeze
INTERNAL_PRODUCER_PARAMS =
[:persistent_connections].freeze

Instance Method Summary collapse

Instance Method Details

#async_configsObject



177
178
179
180
# File 'lib/phobos/producer.rb', line 177

def async_configs
  Phobos.config.producer_hash
        .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end

#async_producerObject



150
151
152
# File 'lib/phobos/producer.rb', line 150

def async_producer
  producer_store[:async_producer]
end

#async_producer_shutdownObject



165
166
167
168
169
# File 'lib/phobos/producer.rb', line 165

def async_producer_shutdown
  async_producer&.deliver_messages
  async_producer&.shutdown
  producer_store[:async_producer] = nil
end

#async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) ⇒ Object



154
155
156
157
# File 'lib/phobos/producer.rb', line 154

def async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
  async_publish_list([{ topic: topic, payload: payload, key: key,
                        partition_key: partition_key, headers: headers }])
end

#async_publish_list(messages) ⇒ Object



159
160
161
162
163
# File 'lib/phobos/producer.rb', line 159

def async_publish_list(messages)
  producer = async_producer || create_async_producer
  produce_messages(producer, messages)
  producer.deliver_messages unless async_automatic_delivery?
end

#configure_kafka_client(kafka_client) ⇒ Object

This method configures the kafka client used with publish operations performed by the host class

Parameters:

  • kafka_client (Kafka::Client)


104
105
106
107
# File 'lib/phobos/producer.rb', line 104

def configure_kafka_client(kafka_client)
  async_producer_shutdown
  producer_store[:kafka_client] = kafka_client
end

#create_async_producerObject



144
145
146
147
148
# File 'lib/phobos/producer.rb', line 144

def create_async_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
  async_producer = client.async_producer(async_configs)
  producer_store[:async_producer] = async_producer
end

#create_sync_producerObject



113
114
115
116
117
118
119
120
# File 'lib/phobos/producer.rb', line 113

def create_sync_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
  sync_producer = client.producer(regular_configs)
  if Phobos.config.producer_hash[:persistent_connections]
    producer_store[:sync_producer] = sync_producer
  end
  sync_producer
end

#kafka_clientObject



109
110
111
# File 'lib/phobos/producer.rb', line 109

def kafka_client
  producer_store[:kafka_client]
end

#publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) ⇒ Object



131
132
133
134
# File 'lib/phobos/producer.rb', line 131

def publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
  publish_list([{ topic: topic, payload: payload, key: key,
                  partition_key: partition_key, headers: headers }])
end

#publish_list(messages) ⇒ Object



136
137
138
139
140
141
142
# File 'lib/phobos/producer.rb', line 136

def publish_list(messages)
  producer = sync_producer || create_sync_producer
  produce_messages(producer, messages)
  producer.deliver_messages
ensure
  producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections]
end

#regular_configsObject



171
172
173
174
175
# File 'lib/phobos/producer.rb', line 171

def regular_configs
  Phobos.config.producer_hash
        .reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) }
        .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end

#sync_producerObject



122
123
124
# File 'lib/phobos/producer.rb', line 122

def sync_producer
  producer_store[:sync_producer]
end

#sync_producer_shutdownObject



126
127
128
129
# File 'lib/phobos/producer.rb', line 126

def sync_producer_shutdown
  sync_producer&.shutdown
  producer_store[:sync_producer] = nil
end