Class: Deimos::Utils::DbProducer
- Inherits:
-
Object
- Object
- Deimos::Utils::DbProducer
- Includes:
- Phobos::Producer
- Defined in:
- lib/deimos/utils/db_producer.rb
Overview
Class which continually polls the database and sends Kafka messages.
Constant Summary collapse
- BATCH_SIZE =
1000
Instance Attribute Summary collapse
-
#current_topic ⇒ Object
Returns the value of attribute current_topic.
-
#id ⇒ Object
Returns the value of attribute id.
Instance Method Summary collapse
-
#initialize(logger = Logger.new(STDOUT)) ⇒ DbProducer
constructor
A new instance of DbProducer.
-
#process_next_messages ⇒ Object
Complete one loop of processing all messages in the DB.
-
#process_topic(topic) ⇒ String
The topic that was locked, or nil if none were.
- #produce_messages(batch) ⇒ Object
- #retrieve_messages ⇒ Array<Deimos::KafkaMessage>
- #retrieve_topics ⇒ Array<String>
-
#send_pending_metrics ⇒ Object
Send metrics to Datadog.
-
#shutdown_producer ⇒ Object
Shut down the sync producer if we have to.
-
#start ⇒ Object
Start the poll.
-
#stop ⇒ Object
Stop the poll.
Constructor Details
#initialize(logger = Logger.new(STDOUT)) ⇒ DbProducer
Returns a new instance of DbProducer.
13 14 15 16 17 |
# File 'lib/deimos/utils/db_producer.rb', line 13 def initialize(logger=Logger.new(STDOUT)) @id = SecureRandom.uuid @logger = logger @logger.("DbProducer #{@id}") if @logger.respond_to?(:push_tags) end |
Instance Attribute Details
#current_topic ⇒ Object
Returns the value of attribute current_topic.
8 9 10 |
# File 'lib/deimos/utils/db_producer.rb', line 8 def current_topic @current_topic end |
#id ⇒ Object
Returns the value of attribute id.
8 9 10 |
# File 'lib/deimos/utils/db_producer.rb', line 8 def id @id end |
Instance Method Details
#process_next_messages ⇒ Object
Complete one loop of processing all messages in the DB.
41 42 43 44 45 46 |
# File 'lib/deimos/utils/db_producer.rb', line 41 def topics = retrieve_topics @logger.info("Found topics: #{topics}") topics.each(&method(:process_topic)) sleep(0.5) end |
#process_topic(topic) ⇒ String
Returns the topic that was locked, or nil if none were.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/deimos/utils/db_producer.rb', line 55 def process_topic(topic) # If the topic is already locked, another producer is currently # working on it. Move on to the next one. unless KafkaTopicInfo.lock(topic, @id) @logger.debug("Could not lock topic #{topic} - continuing") return end @current_topic = topic = while .any? @logger.debug do decoder = .first.decoder "DB producer: Topic #{topic} Producing messages: #{messages.map { |m| m.decoded_message(decoder) }}" end Deimos.instrument('db_producer.produce', topic: topic, messages: ) do begin (.map(&:phobos_message)) rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge .each(&:delete) raise end end .first.class.where(id: .map(&:id)).delete_all break if .size < BATCH_SIZE KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive send_pending_metrics = end KafkaTopicInfo.clear_lock(@current_topic, @id) rescue StandardError => e @logger.error("Error processing messages for topic #{@current_topic}: #{e.class.name}: #{e.message} #{e.backtrace.join("\n")}") KafkaTopicInfo.register_error(@current_topic, @id) end |
#produce_messages(batch) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/deimos/utils/db_producer.rb', line 113 def (batch) batch_size = batch.size begin batch.in_groups_of(batch_size, false).each do |group| @logger.debug("Publishing #{group.size} messages to #{@current_topic}") producer.publish_list(group) Deimos.config.metrics&.increment( 'publish', tags: %W(status:success topic:#{@current_topic}), by: group.size ) @logger.info("Sent #{group.size} messages to #{@current_topic}") end rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge => e if batch_size == 1 shutdown_producer raise end @logger.error("Got error #{e.class.name} when publishing #{batch.size} in groups of #{batch_size}, retrying...") if batch_size < 10 batch_size = 1 else batch_size /= 10 end shutdown_producer retry end end |
#retrieve_messages ⇒ Array<Deimos::KafkaMessage>
92 93 94 |
# File 'lib/deimos/utils/db_producer.rb', line 92 def KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE) end |
#retrieve_topics ⇒ Array<String>
49 50 51 |
# File 'lib/deimos/utils/db_producer.rb', line 49 def retrieve_topics KafkaMessage.select('distinct topic').map(&:topic).uniq end |
#send_pending_metrics ⇒ Object
Send metrics to Datadog.
97 98 99 100 101 |
# File 'lib/deimos/utils/db_producer.rb', line 97 def send_pending_metrics = KafkaMessage.first time_diff = ? Time.zone.now - KafkaMessage.first.created_at : 0 Deimos.config.metrics&.gauge('pending_db_messages_max_wait', time_diff) end |
#shutdown_producer ⇒ Object
Shut down the sync producer if we have to. Phobos will automatically create a new one. We should call this if the producer can be in a bad state and e.g. we need to clear the buffer.
106 107 108 109 110 |
# File 'lib/deimos/utils/db_producer.rb', line 106 def shutdown_producer if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3 self.class.producer.sync_producer_shutdown end end |
#start ⇒ Object
Start the poll.
20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/deimos/utils/db_producer.rb', line 20 def start @logger.info('Starting...') @signal_to_stop = false ActiveRecord::Base.connection.reconnect! loop do if @signal_to_stop @logger.info('Shutting down') break end send_pending_metrics end end |
#stop ⇒ Object
Stop the poll.
35 36 37 38 |
# File 'lib/deimos/utils/db_producer.rb', line 35 def stop @logger.info('Received signal to stop') @signal_to_stop = true end |