Class: Phobos::Producer::ClassMethods::PublicAPI

Inherits:
Object
  • Object
show all
Defined in:
lib/phobos/producer.rb

Constant Summary collapse

NAMESPACE =

Returns:

  • (Symbol)
:phobos_producer_store
ASYNC_PRODUCER_PARAMS =

Returns:

  • (Array<Symbol>)
[:max_queue_size, :delivery_threshold, :delivery_interval].freeze
INTERNAL_PRODUCER_PARAMS =

Returns:

  • (Array<Symbol>)
[:persistent_connections].freeze

Instance Method Summary collapse

Instance Method Details

#async_configsHash

Returns:

  • (Hash)


185
186
187
188
# File 'lib/phobos/producer.rb', line 185

def async_configs
  Phobos.config.producer_hash
        .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end

#async_producerKafka::AsyncProducer

Returns:

  • (Kafka::AsyncProducer)


148
149
150
# File 'lib/phobos/producer.rb', line 148

def async_producer
  producer_store[:async_producer]
end

#async_producer_shutdownvoid

This method returns an undefined value.



171
172
173
174
175
# File 'lib/phobos/producer.rb', line 171

def async_producer_shutdown
  async_producer&.deliver_messages
  async_producer&.shutdown
  producer_store[:async_producer] = nil
end

#async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) ⇒ void

This method returns an undefined value.

Parameters:

  • topic (String)
  • payload (String)
  • partition_key (Integer) (defaults to: nil)
  • headers (Hash) (defaults to: nil)


157
158
159
160
# File 'lib/phobos/producer.rb', line 157

def async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
  async_publish_list([{ topic: topic, payload: payload, key: key,
                        partition_key: partition_key, headers: headers }])
end

#async_publish_list(messages) ⇒ void

This method returns an undefined value.

Parameters:

  • messages (Array<Hash>)


164
165
166
167
168
# File 'lib/phobos/producer.rb', line 164

def async_publish_list(messages)
  producer = async_producer || create_async_producer
  produce_messages(producer, messages)
  producer.deliver_messages unless async_automatic_delivery?
end

#configure_kafka_client(kafka_client) ⇒ void

This method returns an undefined value.

This method configures the kafka client used with publish operations performed by the host class

Parameters:

  • kafka_client (Kafka::Client)


89
90
91
92
# File 'lib/phobos/producer.rb', line 89

def configure_kafka_client(kafka_client)
  async_producer_shutdown
  producer_store[:kafka_client] = kafka_client
end

#create_async_producerKafka::AsyncProducer

Returns:

  • (Kafka::AsyncProducer)


141
142
143
144
145
# File 'lib/phobos/producer.rb', line 141

def create_async_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer))
  async_producer = client.async_producer(**async_configs)
  producer_store[:async_producer] = async_producer
end

#create_sync_producerKafka::Producer

Returns:

  • (Kafka::Producer)


100
101
102
103
104
105
106
107
# File 'lib/phobos/producer.rb', line 100

def create_sync_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer))
  sync_producer = client.producer(**regular_configs)
  if Phobos.config.producer_hash[:persistent_connections]
    producer_store[:sync_producer] = sync_producer
  end
  sync_producer
end

#kafka_clientKafka::Client

Returns:

  • (Kafka::Client)


95
96
97
# File 'lib/phobos/producer.rb', line 95

def kafka_client
  producer_store[:kafka_client]
end

#publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) ⇒ void

This method returns an undefined value.

Parameters:

  • topic (String)
  • payload (String)
  • partition_key (Integer) (defaults to: nil)
  • headers (Hash) (defaults to: nil)


125
126
127
128
# File 'lib/phobos/producer.rb', line 125

def publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
  publish_list([{ topic: topic, payload: payload, key: key,
                  partition_key: partition_key, headers: headers }])
end

#publish_list(messages) ⇒ void

This method returns an undefined value.

Parameters:

  • messages (Array<Hash>)


132
133
134
135
136
137
138
# File 'lib/phobos/producer.rb', line 132

def publish_list(messages)
  producer = sync_producer || create_sync_producer
  produce_messages(producer, messages)
  producer.deliver_messages
ensure
  producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections]
end

#regular_configsHash

Returns:

  • (Hash)


178
179
180
181
182
# File 'lib/phobos/producer.rb', line 178

def regular_configs
  Phobos.config.producer_hash
        .reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) }
        .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end

#sync_producerKafka::Producer

Returns:

  • (Kafka::Producer)


110
111
112
# File 'lib/phobos/producer.rb', line 110

def sync_producer
  producer_store[:sync_producer]
end

#sync_producer_shutdownvoid

This method returns an undefined value.



115
116
117
118
# File 'lib/phobos/producer.rb', line 115

def sync_producer_shutdown
  sync_producer&.shutdown
  producer_store[:sync_producer] = nil
end