Class: Deimos::Utils::DbProducer

Inherits:
Object
  • Object
show all
Includes:
Phobos::Producer
Defined in:
lib/deimos/utils/db_producer.rb

Overview

Class which continually polls the kafka_messages table in the database and sends Kafka messages.

Constant Summary collapse

BATCH_SIZE =
1000
DELETE_BATCH_SIZE =
10
MAX_DELETE_ATTEMPTS =
3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger = Logger.new(STDOUT)) ⇒ DbProducer

Returns a new instance of DbProducer.

Parameters:

  • logger (Logger) (defaults to: Logger.new(STDOUT))


16
17
18
19
20
# File 'lib/deimos/utils/db_producer.rb', line 16

def initialize(logger=Logger.new(STDOUT))
  @id = SecureRandom.uuid
  @logger = logger
  @logger.push_tags("DbProducer #{@id}") if @logger.respond_to?(:push_tags)
end

Instance Attribute Details

#current_topicObject

Returns the value of attribute current_topic.



9
10
11
# File 'lib/deimos/utils/db_producer.rb', line 9

def current_topic
  @current_topic
end

#idObject

Returns the value of attribute id.



9
10
11
# File 'lib/deimos/utils/db_producer.rb', line 9

def id
  @id
end

Instance Method Details

#compact_messages(batch) ⇒ Array<Deimos::KafkaMessage>

Parameters:

Returns:



231
232
233
234
235
236
237
238
239
# File 'lib/deimos/utils/db_producer.rb', line 231

def compact_messages(batch)
  return batch if batch.first&.key.blank?

  topic = batch.first.topic
  return batch if config.compact_topics != :all &&
                  !config.compact_topics.include?(topic)

  batch.reverse.uniq(&:key).reverse!
end

#configDeimos::DbProducerConfig

Returns:

  • (Deimos::DbProducerConfig)


23
24
25
# File 'lib/deimos/utils/db_producer.rb', line 23

def config
  Deimos.config.db_producer
end

#delete_messages(messages) ⇒ Object

Parameters:



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/deimos/utils/db_producer.rb', line 113

def delete_messages(messages)
  attempts = 1
  begin
    messages.in_groups_of(DELETE_BATCH_SIZE, false).each do |batch|
      Deimos::KafkaMessage.where(topic: batch.first.topic,
                                 id: batch.map(&:id)).
        delete_all
    end
  rescue StandardError => e
    if (e.message =~ /Lock wait/i || e.message =~ /Lost connection/i) &&
       attempts <= MAX_DELETE_ATTEMPTS
      attempts += 1
      ActiveRecord::Base.connection.verify!
      sleep(1)
      retry
    end
    raise
  end
end

#log_messages(messages) ⇒ Object

Parameters:



139
140
141
142
143
144
145
146
# File 'lib/deimos/utils/db_producer.rb', line 139

def log_messages(messages)
  return if config.log_topics != :all && !config.log_topics.include?(@current_topic)

  @logger.debug do
    decoded_messages = Deimos::KafkaMessage.decoded(messages)
    "DB producer: Topic #{@current_topic} Producing messages: #{decoded_messages}}"
  end
end

#process_next_messagesObject

Complete one loop of processing all messages in the DB.



49
50
51
52
53
54
55
# File 'lib/deimos/utils/db_producer.rb', line 49

def process_next_messages
  topics = retrieve_topics
  @logger.info("Found topics: #{topics}")
  topics.each(&method(:process_topic))
  KafkaTopicInfo.ping_empty_topics(topics)
  sleep(0.5)
end

#process_topic(topic) ⇒ String

Returns the topic that was locked, or nil if none were.

Parameters:

  • topic (String)

Returns:

  • (String)

    the topic that was locked, or nil if none were.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/deimos/utils/db_producer.rb', line 64

def process_topic(topic)
  # If the topic is already locked, another producer is currently
  # working on it. Move on to the next one.
  unless KafkaTopicInfo.lock(topic, @id)
    @logger.debug("Could not lock topic #{topic} - continuing")
    return
  end
  @current_topic = topic

  loop { break unless process_topic_batch }

  KafkaTopicInfo.clear_lock(@current_topic, @id)
rescue StandardError => e
  @logger.error("Error processing messages for topic #{@current_topic}: #{e.class.name}: #{e.message} #{e.backtrace.join("\n")}")
  KafkaTopicInfo.register_error(@current_topic, @id)
end

#process_topic_batchObject

Process a single batch in a topic.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/deimos/utils/db_producer.rb', line 82

def process_topic_batch
  messages = retrieve_messages
  return false if messages.empty?

  batch_size = messages.size
  compacted_messages = compact_messages(messages)
  log_messages(compacted_messages)
  Deimos.instrument('db_producer.produce', topic: @current_topic, messages: compacted_messages) do
    begin
      produce_messages(compacted_messages.map(&:phobos_message))
    rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge
      delete_messages(messages)
      @logger.error('Message batch too large, deleting...')
      @logger.error(Deimos::KafkaMessage.decoded(messages))
      raise
    end
  end
  delete_messages(messages)
  Deimos.config.metrics&.increment(
    'db_producer.process',
    tags: %W(topic:#{@current_topic}),
    by: messages.size
  )
  return false if batch_size < BATCH_SIZE

  KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive
  send_pending_metrics
  true
end

#produce_messages(batch) ⇒ Object

Produce messages in batches, reducing the size 1/10 if the batch is too large. Does not retry batches of messages that have already been sent.

Parameters:

  • batch (Array<Hash>)


196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/deimos/utils/db_producer.rb', line 196

def produce_messages(batch)
  batch_size = batch.size
  current_index = 0
  begin
    batch[current_index..-1].in_groups_of(batch_size, false).each do |group|
      @logger.debug("Publishing #{group.size} messages to #{@current_topic}")
      producer.publish_list(group)
      Deimos.config.metrics&.increment(
        'publish',
        tags: %W(status:success topic:#{@current_topic}),
        by: group.size
      )
      current_index += group.size
      @logger.info("Sent #{group.size} messages to #{@current_topic}")
    end
  rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge,
         Kafka::RecordListTooLarge => e
    if batch_size == 1
      shutdown_producer
      raise
    end

    @logger.error("Got error #{e.class.name} when publishing #{batch.size} in groups of #{batch_size}, retrying...")
    batch_size = if batch_size < 10
                   1
                 else
                   (batch_size / 10).to_i
                 end
    shutdown_producer
    retry
  end
end

#retrieve_messagesArray<Deimos::KafkaMessage>

Returns:



134
135
136
# File 'lib/deimos/utils/db_producer.rb', line 134

def retrieve_messages
  KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE)
end

#retrieve_topicsArray<String>

Returns:

  • (Array<String>)


58
59
60
# File 'lib/deimos/utils/db_producer.rb', line 58

def retrieve_topics
  KafkaMessage.select('distinct topic').map(&:topic).uniq
end

#send_pending_metricsObject

Send metrics to Datadog.



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/deimos/utils/db_producer.rb', line 149

def send_pending_metrics
  metrics = Deimos.config.metrics
  return unless metrics

  topics = KafkaTopicInfo.select(%w(topic last_processed_at))
  messages = Deimos::KafkaMessage.
    select('count(*) as num_messages, min(created_at) as earliest, topic').
    group(:topic).
    index_by(&:topic)
  topics.each do |record|
    message_record = messages[record.topic]
    # We want to record the last time we saw any activity, meaning either
    # the oldest message, or the last time we processed, whichever comes
    # last.
    if message_record
      record_earliest = message_record.earliest
      # SQLite gives a string here
      if record_earliest.is_a?(String)
        record_earliest = Time.zone.parse(record_earliest)
      end

      earliest = [record.last_processed_at, record_earliest].max
      time_diff = Time.zone.now - earliest
      metrics.gauge('pending_db_messages_max_wait', time_diff,
                    tags: ["topic:#{record.topic}"])
    else
      # no messages waiting
      metrics.gauge('pending_db_messages_max_wait', 0,
                    tags: ["topic:#{record.topic}"])
    end
    metrics.gauge('pending_db_messages_count', message_record&.num_messages || 0,
                  tags: ["topic:#{record.topic}"])
  end
end

#shutdown_producerObject

Shut down the sync producer if we have to. Phobos will automatically create a new one. We should call this if the producer can be in a bad state and e.g. we need to clear the buffer.



187
188
189
190
191
# File 'lib/deimos/utils/db_producer.rb', line 187

def shutdown_producer
  if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3
    self.class.producer.sync_producer_shutdown
  end
end

#startObject

Start the poll.



28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/deimos/utils/db_producer.rb', line 28

def start
  @logger.info('Starting...')
  @signal_to_stop = false
  ActiveRecord::Base.connection.reconnect!
  loop do
    if @signal_to_stop
      @logger.info('Shutting down')
      break
    end
    send_pending_metrics
    process_next_messages
  end
end

#stopObject

Stop the poll.



43
44
45
46
# File 'lib/deimos/utils/db_producer.rb', line 43

def stop
  @logger.info('Received signal to stop')
  @signal_to_stop = true
end