Class: Phobos::Producer::ClassMethods::PublicAPI

Inherits:
Object
  • Object
show all
Defined in:
lib/phobos/producer.rb

Constant Summary collapse

NAMESPACE =
:phobos_producer_store
ASYNC_PRODUCER_PARAMS =
[:max_queue_size, :delivery_threshold, :delivery_interval].freeze
INTERNAL_PRODUCER_PARAMS =
[:persistent_connections].freeze

Instance Method Summary collapse

Instance Method Details

#async_configsObject



143
144
145
146
# File 'lib/phobos/producer.rb', line 143

def async_configs
  Phobos.config.producer_hash
        .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end

#async_producerObject



116
117
118
# File 'lib/phobos/producer.rb', line 116

def async_producer
  producer_store[:async_producer]
end

#async_producer_shutdownObject



131
132
133
134
135
# File 'lib/phobos/producer.rb', line 131

def async_producer_shutdown
  async_producer&.deliver_messages
  async_producer&.shutdown
  producer_store[:async_producer] = nil
end

#async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) ⇒ Object



120
121
122
123
# File 'lib/phobos/producer.rb', line 120

def async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
  async_publish_list([{ topic: topic, payload: payload, key: key,
                        partition_key: partition_key, headers: headers }])
end

#async_publish_list(messages) ⇒ Object



125
126
127
128
129
# File 'lib/phobos/producer.rb', line 125

def async_publish_list(messages)
  producer = async_producer || create_async_producer
  produce_messages(producer, messages)
  producer.deliver_messages unless async_automatic_delivery?
end

#configure_kafka_client(kafka_client) ⇒ Object

This method configures the kafka client used with publish operations performed by the host class

Parameters:

  • kafka_client (Kafka::Client)


70
71
72
73
# File 'lib/phobos/producer.rb', line 70

def configure_kafka_client(kafka_client)
  async_producer_shutdown
  producer_store[:kafka_client] = kafka_client
end

#create_async_producerObject



110
111
112
113
114
# File 'lib/phobos/producer.rb', line 110

def create_async_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer))
  async_producer = client.async_producer(**async_configs)
  producer_store[:async_producer] = async_producer
end

#create_sync_producerObject



79
80
81
82
83
84
85
86
# File 'lib/phobos/producer.rb', line 79

def create_sync_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer))
  sync_producer = client.producer(**regular_configs)
  if Phobos.config.producer_hash[:persistent_connections]
    producer_store[:sync_producer] = sync_producer
  end
  sync_producer
end

#kafka_clientObject



75
76
77
# File 'lib/phobos/producer.rb', line 75

def kafka_client
  producer_store[:kafka_client]
end

#publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) ⇒ Object



97
98
99
100
# File 'lib/phobos/producer.rb', line 97

def publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
  publish_list([{ topic: topic, payload: payload, key: key,
                  partition_key: partition_key, headers: headers }])
end

#publish_list(messages) ⇒ Object



102
103
104
105
106
107
108
# File 'lib/phobos/producer.rb', line 102

def publish_list(messages)
  producer = sync_producer || create_sync_producer
  produce_messages(producer, messages)
  producer.deliver_messages
ensure
  producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections]
end

#regular_configsObject



137
138
139
140
141
# File 'lib/phobos/producer.rb', line 137

def regular_configs
  Phobos.config.producer_hash
        .reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) }
        .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end

#sync_producerObject



88
89
90
# File 'lib/phobos/producer.rb', line 88

def sync_producer
  producer_store[:sync_producer]
end

#sync_producer_shutdownObject



92
93
94
95
# File 'lib/phobos/producer.rb', line 92

def sync_producer_shutdown
  sync_producer&.shutdown
  producer_store[:sync_producer] = nil
end