Class: Deimos::KafkaTopicInfo

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/deimos/kafka_topic_info.rb

Overview

Record that keeps track of which topics are being worked on by DbProducers.

Class Method Summary collapse

Class Method Details

.clear_lock(topic, lock_id) ⇒ Object

This is called once a producer is finished working on a topic, i.e. there are no more messages to fetch. It unlocks the topic and moves on to the next one.

Parameters:

  • topic (String)
  • lock_id (String)


53
54
55
56
# File 'lib/deimos/kafka_topic_info.rb', line 53

def clear_lock(topic, lock_id)
  self.where(topic: topic, locked_by: lock_id).
    update_all(locked_by: nil, locked_at: nil, error: false, retries: 0)
end

.heartbeat(topic, lock_id) ⇒ Object

Update the locked_at timestamp to indicate that the producer is still working on those messages and to continue.

Parameters:

  • topic (String)
  • lock_id (String)


80
81
82
83
# File 'lib/deimos/kafka_topic_info.rb', line 80

def heartbeat(topic, lock_id)
  self.where(topic: topic, locked_by: lock_id).
    update_all(locked_at: Time.zone.now)
end

.lock(topic, lock_id) ⇒ Boolean

Lock a topic for the given ID. Returns whether the lock was successful.

Parameters:

  • topic (String)
  • lock_id (String)

Returns:

  • (Boolean)


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/deimos/kafka_topic_info.rb', line 13

def lock(topic, lock_id)
  # Try to create it - it's fine if it already exists
  begin
    self.create(topic: topic)
  rescue ActiveRecord::RecordNotUnique # rubocop:disable Lint/HandleExceptions
    # continue on
  end

  # Lock the record
  qtopic = self.connection.quote(topic)
  qlock_id = self.connection.quote(lock_id)
  qtable = self.connection.quote_table_name('kafka_topic_info')
  qnow = self.connection.quote(Time.zone.now.to_s(:db))
  qfalse = self.connection.quoted_false
  qtime = self.connection.quote(1.minute.ago.to_s(:db))

  # If a record is marked as error and less than 1 minute old,
  # we don't want to pick it up even if not currently locked because
  # we worry we'll run into the same problem again.
  # Once it's more than 1 minute old, we figure it's OK to try again
  # so we can pick up any topic that's that old, even if it was
  # locked by someone, because it's the job of the producer to keep
  # updating the locked_at timestamp as they work on messages in that
  # topic. If the locked_at timestamp is that old, chances are that
  # the producer crashed.
  sql = <<~SQL
    UPDATE #{qtable}
    SET locked_by=#{qlock_id}, locked_at=#{qnow}, error=#{qfalse}
    WHERE topic=#{qtopic} AND
     ((locked_by IS NULL AND error=#{qfalse}) OR locked_at < #{qtime})
  SQL
  self.connection.update(sql)
  self.where(locked_by: lock_id, topic: topic).any?
end

.register_error(topic, lock_id) ⇒ Object

The producer calls this if it gets an error sending messages. This essentially locks down this topic for 1 minute (for all producers) and allows the caller to continue to the next topic.

Parameters:

  • topic (String)
  • lock_id (String)


63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/deimos/kafka_topic_info.rb', line 63

def register_error(topic, lock_id)
  record = self.where(topic: topic, locked_by: lock_id).last
  attr_hash = { locked_by: nil,
                locked_at: Time.zone.now,
                error: true,
                retries: record.retries + 1 }
  if ActiveRecord::VERSION::MAJOR >= 4
    record.update!(attr_hash)
  else
    record.update_attributes!(attr_hash)
  end
end