Class: PubSubModelSync::ServiceKafka
- Inherits:
-
ServiceBase
- Object
- Base
- ServiceBase
- PubSubModelSync::ServiceKafka
- Defined in:
- lib/pub_sub_model_sync/service_kafka.rb
Constant Summary collapse
- QTY_WORKERS =
10- LISTEN_SETTINGS =
{ automatically_mark_as_processed: false }.freeze
- PUBLISH_SETTINGS =
{}.freeze
- PRODUCER_SETTINGS =
{ delivery_threshold: 200, delivery_interval: 30 }.freeze
Constants inherited from ServiceBase
PubSubModelSync::ServiceBase::SERVICE_KEY
Instance Attribute Summary collapse
-
#consumer ⇒ Object
Returns the value of attribute consumer.
-
#service ⇒ Object
Returns the value of attribute service.
-
#topic_names ⇒ Object
: [‘topic 1’, ‘topic 2’].
Instance Method Summary collapse
-
#initialize ⇒ ServiceKafka
constructor
A new instance of ServiceKafka.
- #listen_messages ⇒ Object
- #publish(payload) ⇒ Object
- #stop ⇒ Object
Methods inherited from Base
Constructor Details
#initialize ⇒ ServiceKafka
Returns a new instance of ServiceKafka.
19 20 21 22 23 24 |
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 19 def initialize settings = config.kafka_connection settings[1][:client_id] ||= config.subscription_key @service = Kafka.new(*settings) @topic_names = ensure_topics(Array(config.topic_name || 'model_sync')) end |
Instance Attribute Details
#consumer ⇒ Object
Returns the value of attribute consumer.
17 18 19 |
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 17 def consumer @consumer end |
#service ⇒ Object
Returns the value of attribute service.
17 18 19 |
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 17 def service @service end |
#topic_names ⇒ Object
: [‘topic 1’, ‘topic 2’]
17 |
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 17 attr_accessor :service, :consumer, :topic_names |
Instance Method Details
#listen_messages ⇒ Object
26 27 28 29 30 31 32 33 34 |
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 26 def log('Listener starting...') start_consumer consumer.(LISTEN_SETTINGS, &method(:process_message)) rescue PubSubModelSync::Runner::ShutDown log('Listener stopped') rescue => e log("Error listening message: #{[e.message, e.backtrace]}", :error) end |
#publish(payload) ⇒ Object
36 37 38 39 40 41 |
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 36 def publish(payload) = Array(payload.headers[:topic_name] || config.default_topic_name) .each do |topic_name| producer.produce(encode_payload(payload), (payload, topic_name)) end end |
#stop ⇒ Object
43 44 45 46 |
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 43 def stop log('Listener stopping...') consumer.stop end |