Class: Phobos::Producer::ClassMethods::PublicAPI

Inherits:
Object
  • Object
show all
Defined in:
lib/phobos/producer.rb

Constant Summary collapse

NAMESPACE =
:phobos_producer_store
ASYNC_PRODUCER_PARAMS =
[:max_queue_size, :delivery_threshold, :delivery_interval].freeze
INTERNAL_PRODUCER_PARAMS =
[:persistent_connections].freeze

Instance Method Summary collapse

Instance Method Details

#async_configsObject



135
136
137
138
# File 'lib/phobos/producer.rb', line 135

def async_configs
  Phobos.config.producer_hash
        .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end

#async_producerObject



108
109
110
# File 'lib/phobos/producer.rb', line 108

def async_producer
  producer_store[:async_producer]
end

#async_producer_shutdownObject



123
124
125
126
127
# File 'lib/phobos/producer.rb', line 123

def async_producer_shutdown
  async_producer&.deliver_messages
  async_producer&.shutdown
  producer_store[:async_producer] = nil
end

#async_publish(topic, payload, key = nil, partition_key = nil) ⇒ Object



112
113
114
115
# File 'lib/phobos/producer.rb', line 112

def async_publish(topic, payload, key = nil, partition_key = nil)
  async_publish_list([{ topic: topic, payload: payload, key: key,
                        partition_key: partition_key }])
end

#async_publish_list(messages) ⇒ Object



117
118
119
120
121
# File 'lib/phobos/producer.rb', line 117

def async_publish_list(messages)
  producer = async_producer || create_async_producer
  produce_messages(producer, messages)
  producer.deliver_messages unless async_automatic_delivery?
end

#configure_kafka_client(kafka_client) ⇒ Object

This method configures the kafka client used with publish operations performed by the host class

Parameters:

  • kafka_client (Kafka::Client)


62
63
64
65
# File 'lib/phobos/producer.rb', line 62

def configure_kafka_client(kafka_client)
  async_producer_shutdown
  producer_store[:kafka_client] = kafka_client
end

#create_async_producerObject



102
103
104
105
106
# File 'lib/phobos/producer.rb', line 102

def create_async_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
  async_producer = client.async_producer(async_configs)
  producer_store[:async_producer] = async_producer
end

#create_sync_producerObject



71
72
73
74
75
76
77
78
# File 'lib/phobos/producer.rb', line 71

def create_sync_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
  sync_producer = client.producer(regular_configs)
  if Phobos.config.producer_hash[:persistent_connections]
    producer_store[:sync_producer] = sync_producer
  end
  sync_producer
end

#kafka_clientObject



67
68
69
# File 'lib/phobos/producer.rb', line 67

def kafka_client
  producer_store[:kafka_client]
end

#publish(topic, payload, key = nil, partition_key = nil) ⇒ Object



89
90
91
92
# File 'lib/phobos/producer.rb', line 89

def publish(topic, payload, key = nil, partition_key = nil)
  publish_list([{ topic: topic, payload: payload, key: key,
                  partition_key: partition_key }])
end

#publish_list(messages) ⇒ Object



94
95
96
97
98
99
100
# File 'lib/phobos/producer.rb', line 94

def publish_list(messages)
  producer = sync_producer || create_sync_producer
  produce_messages(producer, messages)
  producer.deliver_messages
ensure
  producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections]
end

#regular_configsObject



129
130
131
132
133
# File 'lib/phobos/producer.rb', line 129

def regular_configs
  Phobos.config.producer_hash
        .reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) }
        .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end

#sync_producerObject



80
81
82
# File 'lib/phobos/producer.rb', line 80

def sync_producer
  producer_store[:sync_producer]
end

#sync_producer_shutdownObject



84
85
86
87
# File 'lib/phobos/producer.rb', line 84

def sync_producer_shutdown
  sync_producer&.shutdown
  producer_store[:sync_producer] = nil
end