Class: Phobos::Producer::ClassMethods::PublicAPI

Inherits:
Object
  • Object
show all
Defined in:
lib/phobos/producer.rb

Constant Summary collapse

NAMESPACE =
:phobos_producer_store
ASYNC_PRODUCER_PARAMS =
[:max_queue_size, :delivery_threshold, :delivery_interval].freeze

Instance Method Summary collapse

Instance Method Details

#async_configsObject



115
116
117
# File 'lib/phobos/producer.rb', line 115

def async_configs
  Phobos.config.producer_hash
end

#async_producerObject



90
91
92
# File 'lib/phobos/producer.rb', line 90

def async_producer
  producer_store[:async_producer]
end

#async_producer_shutdownObject



105
106
107
108
109
# File 'lib/phobos/producer.rb', line 105

def async_producer_shutdown
  async_producer&.deliver_messages
  async_producer&.shutdown
  producer_store[:async_producer] = nil
end

#async_publish(topic, payload, key = nil, partition_key = nil) ⇒ Object



94
95
96
97
# File 'lib/phobos/producer.rb', line 94

def async_publish(topic, payload, key = nil, partition_key = nil)
  async_publish_list([{ topic: topic, payload: payload, key: key,
                        partition_key: partition_key }])
end

#async_publish_list(messages) ⇒ Object



99
100
101
102
103
# File 'lib/phobos/producer.rb', line 99

def async_publish_list(messages)
  producer = async_producer || create_async_producer
  produce_messages(producer, messages)
  producer.deliver_messages unless async_automatic_delivery?
end

#configure_kafka_client(kafka_client) ⇒ Object

This method configures the kafka client used with publish operations performed by the host class

Parameters:

  • kafka_client (Kafka::Client)


61
62
63
64
# File 'lib/phobos/producer.rb', line 61

def configure_kafka_client(kafka_client)
  async_producer_shutdown
  producer_store[:kafka_client] = kafka_client
end

#create_async_producerObject



84
85
86
87
88
# File 'lib/phobos/producer.rb', line 84

def create_async_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
  async_producer = client.async_producer(async_configs)
  producer_store[:async_producer] = async_producer
end

#kafka_clientObject



66
67
68
# File 'lib/phobos/producer.rb', line 66

def kafka_client
  producer_store[:kafka_client]
end

#publish(topic, payload, key = nil, partition_key = nil) ⇒ Object



70
71
72
73
# File 'lib/phobos/producer.rb', line 70

def publish(topic, payload, key = nil, partition_key = nil)
  publish_list([{ topic: topic, payload: payload, key: key,
                  partition_key: partition_key }])
end

#publish_list(messages) ⇒ Object



75
76
77
78
79
80
81
82
# File 'lib/phobos/producer.rb', line 75

def publish_list(messages)
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
  producer = client.producer(regular_configs)
  produce_messages(producer, messages)
  producer.deliver_messages
ensure
  producer&.shutdown
end

#regular_configsObject



111
112
113
# File 'lib/phobos/producer.rb', line 111

def regular_configs
  Phobos.config.producer_hash.reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) }
end