Class: Deimos::Utils::DbProducer

Inherits:
Object
  • Object
show all
Includes:
Phobos::Producer
Defined in:
lib/deimos/utils/db_producer.rb

Overview

Class which continually polls the database and sends Kafka messages.

Constant Summary collapse

BATCH_SIZE =
1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger = Logger.new(STDOUT)) ⇒ DbProducer

Returns a new instance of DbProducer.

Parameters:

  • logger (Logger) (defaults to: Logger.new(STDOUT))


13
14
15
16
17
# File 'lib/deimos/utils/db_producer.rb', line 13

def initialize(logger=Logger.new(STDOUT))
  @id = SecureRandom.uuid
  @logger = logger
  @logger.push_tags("DbProducer #{@id}") if @logger.respond_to?(:push_tags)
end

Instance Attribute Details

#current_topicObject

Returns the value of attribute current_topic.



8
9
10
# File 'lib/deimos/utils/db_producer.rb', line 8

def current_topic
  @current_topic
end

#idObject

Returns the value of attribute id.



8
9
10
# File 'lib/deimos/utils/db_producer.rb', line 8

def id
  @id
end

Instance Method Details

#process_next_messagesObject

Complete one loop of processing all messages in the DB.



41
42
43
44
45
46
# File 'lib/deimos/utils/db_producer.rb', line 41

def process_next_messages
  topics = retrieve_topics
  @logger.info("Found topics: #{topics}")
  topics.each(&method(:process_topic))
  sleep(0.5)
end

#process_topic(topic) ⇒ String

Returns the topic that was locked, or nil if none were.

Parameters:

  • topic (String)

Returns:

  • (String)

    the topic that was locked, or nil if none were.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/deimos/utils/db_producer.rb', line 55

def process_topic(topic)
  # If the topic is already locked, another producer is currently
  # working on it. Move on to the next one.
  unless KafkaTopicInfo.lock(topic, @id)
    @logger.debug("Could not lock topic #{topic} - continuing")
    return
  end
  @current_topic = topic
  messages = retrieve_messages

  while messages.any?
    @logger.debug do
      decoder = messages.first.decoder
      "DB producer: Topic #{topic} Producing messages: #{messages.map { |m| m.decoded_message(decoder) }}"
    end
    Deimos.instrument('db_producer.produce', topic: topic, messages: messages) do
      begin
        produce_messages(messages.map(&:phobos_message))
      rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge
        messages.each(&:delete)
        raise
      end
    end
    messages.first.class.where(id: messages.map(&:id)).delete_all
    break if messages.size < BATCH_SIZE

    KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive
    send_pending_metrics
    messages = retrieve_messages
  end
  KafkaTopicInfo.clear_lock(@current_topic, @id)
rescue StandardError => e
  @logger.error("Error processing messages for topic #{@current_topic}: #{e.class.name}: #{e.message} #{e.backtrace.join("\n")}")
  KafkaTopicInfo.register_error(@current_topic, @id)
end

#produce_messages(batch) ⇒ Object

Parameters:

  • batch (Array<Hash>)


113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/deimos/utils/db_producer.rb', line 113

def produce_messages(batch)
  batch_size = batch.size
  begin
    batch.in_groups_of(batch_size, false).each do |group|
      @logger.debug("Publishing #{group.size} messages to #{@current_topic}")
      producer.publish_list(group)
      Deimos.config.metrics&.increment(
        'publish',
        tags: %W(status:success topic:#{@current_topic}),
        by: group.size
      )
      @logger.info("Sent #{group.size} messages to #{@current_topic}")
    end
  rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge,
         Kafka::RecordListTooLarge => e
    if batch_size == 1
      shutdown_producer
      raise
    end

    @logger.error("Got error #{e.class.name} when publishing #{batch.size} in groups of #{batch_size}, retrying...")
    if batch_size < 10
      batch_size = 1
    else
      batch_size /= 10
    end
    shutdown_producer
    retry
  end
end

#retrieve_messagesArray<Deimos::KafkaMessage>

Returns:



92
93
94
# File 'lib/deimos/utils/db_producer.rb', line 92

def retrieve_messages
  KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE)
end

#retrieve_topicsArray<String>

Returns:

  • (Array<String>)


49
50
51
# File 'lib/deimos/utils/db_producer.rb', line 49

def retrieve_topics
  KafkaMessage.select('distinct topic').map(&:topic).uniq
end

#send_pending_metricsObject

Send metrics to Datadog.



97
98
99
100
101
# File 'lib/deimos/utils/db_producer.rb', line 97

def send_pending_metrics
  first_message = KafkaMessage.first
  time_diff = first_message ? Time.zone.now - KafkaMessage.first.created_at : 0
  Deimos.config.metrics&.gauge('pending_db_messages_max_wait', time_diff)
end

#shutdown_producerObject

Shut down the sync producer if we have to. Phobos will automatically create a new one. We should call this if the producer can be in a bad state and e.g. we need to clear the buffer.



106
107
108
109
110
# File 'lib/deimos/utils/db_producer.rb', line 106

def shutdown_producer
  if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3
    self.class.producer.sync_producer_shutdown
  end
end

#startObject

Start the poll.



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/deimos/utils/db_producer.rb', line 20

def start
  @logger.info('Starting...')
  @signal_to_stop = false
  ActiveRecord::Base.connection.reconnect!
  loop do
    if @signal_to_stop
      @logger.info('Shutting down')
      break
    end
    send_pending_metrics
    process_next_messages
  end
end

#stopObject

Stop the poll.



35
36
37
38
# File 'lib/deimos/utils/db_producer.rb', line 35

def stop
  @logger.info('Received signal to stop')
  @signal_to_stop = true
end