Class: PubSubModelSync::ServiceKafka

Inherits:
ServiceBase show all
Defined in:
lib/pub_sub_model_sync/service_kafka.rb

Constant Summary collapse

QTY_WORKERS =
10
LISTEN_SETTINGS =
{ automatically_mark_as_processed: false }.freeze
PUBLISH_SETTINGS =
{}.freeze
PRODUCER_SETTINGS =
{ delivery_threshold: 200, delivery_interval: 30 }.freeze

Constants inherited from ServiceBase

PubSubModelSync::ServiceBase::SERVICE_KEY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

config, debug?, log

Constructor Details

#initializeServiceKafka

Returns a new instance of ServiceKafka.



19
20
21
22
23
24
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 19

def initialize
  settings = config.kafka_connection
  settings[1][:client_id] ||= config.subscription_key
  @service = Kafka.new(*settings)
  @topic_names = ensure_topics(Array(config.topic_name || 'model_sync'))
end

Instance Attribute Details

#consumerObject

Returns the value of attribute consumer.



17
18
19
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 17

def consumer
  @consumer
end

#serviceObject

Returns the value of attribute service.



17
18
19
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 17

def service
  @service
end

#topic_namesObject

: [‘topic 1’, ‘topic 2’]



17
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 17

attr_accessor :service, :consumer, :topic_names

Instance Method Details

#listen_messagesObject



26
27
28
29
30
31
32
33
34
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 26

def listen_messages
  log('Listener starting...')
  start_consumer
  consumer.each_message(LISTEN_SETTINGS, &method(:process_message))
rescue PubSubModelSync::Runner::ShutDown
  log('Listener stopped')
rescue => e
  log("Error listening message: #{[e.message, e.backtrace]}", :error)
end

#publish(payload) ⇒ Object



36
37
38
39
40
41
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 36

def publish(payload)
  message_topics = Array(payload.headers[:topic_name] || config.default_topic_name)
  message_topics.each do |topic_name|
    producer.produce(encode_payload(payload), message_settings(payload, topic_name))
  end
end

#stopObject



43
44
45
46
# File 'lib/pub_sub_model_sync/service_kafka.rb', line 43

def stop
  log('Listener stopping...')
  consumer.stop
end