Class: Deimos::KafkaTopicInfo
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Deimos::KafkaTopicInfo
- Defined in:
- lib/deimos/kafka_topic_info.rb
Overview
Record that keeps track of which topics are being worked on by DbProducers.
Class Method Summary collapse
-
.clear_lock(topic, lock_id) ⇒ Object
This is called once a producer is finished working on a topic, i.e.
-
.heartbeat(topic, lock_id) ⇒ Object
Update the locked_at timestamp to indicate that the producer is still working on those messages and to continue.
-
.lock(topic, lock_id) ⇒ Boolean
Lock a topic for the given ID.
-
.register_error(topic, lock_id) ⇒ Object
The producer calls this if it gets an error sending messages.
Class Method Details
.clear_lock(topic, lock_id) ⇒ Object
This is called once a producer is finished working on a topic, i.e. there are no more messages to fetch. It unlocks the topic and moves on to the next one.
53 54 55 56 |
# File 'lib/deimos/kafka_topic_info.rb', line 53 def clear_lock(topic, lock_id) self.where(topic: topic, locked_by: lock_id). update_all(locked_by: nil, locked_at: nil, error: false, retries: 0) end |
.heartbeat(topic, lock_id) ⇒ Object
Update the locked_at timestamp to indicate that the producer is still working on those messages and to continue.
80 81 82 83 |
# File 'lib/deimos/kafka_topic_info.rb', line 80 def heartbeat(topic, lock_id) self.where(topic: topic, locked_by: lock_id). update_all(locked_at: Time.zone.now) end |
.lock(topic, lock_id) ⇒ Boolean
Lock a topic for the given ID. Returns whether the lock was successful.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/deimos/kafka_topic_info.rb', line 13 def lock(topic, lock_id) # Try to create it - it's fine if it already exists begin self.create(topic: topic) rescue ActiveRecord::RecordNotUnique # rubocop:disable Lint/HandleExceptions # continue on end # Lock the record qtopic = self.connection.quote(topic) qlock_id = self.connection.quote(lock_id) qtable = self.connection.quote_table_name('kafka_topic_info') qnow = self.connection.quote(Time.zone.now.to_s(:db)) qfalse = self.connection.quoted_false qtime = self.connection.quote(1.minute.ago.to_s(:db)) # If a record is marked as error and less than 1 minute old, # we don't want to pick it up even if not currently locked because # we worry we'll run into the same problem again. # Once it's more than 1 minute old, we figure it's OK to try again # so we can pick up any topic that's that old, even if it was # locked by someone, because it's the job of the producer to keep # updating the locked_at timestamp as they work on messages in that # topic. If the locked_at timestamp is that old, chances are that # the producer crashed. sql = <<~SQL UPDATE #{qtable} SET locked_by=#{qlock_id}, locked_at=#{qnow}, error=#{qfalse} WHERE topic=#{qtopic} AND ((locked_by IS NULL AND error=#{qfalse}) OR locked_at < #{qtime}) SQL self.connection.update(sql) self.where(locked_by: lock_id, topic: topic).any? end |
.register_error(topic, lock_id) ⇒ Object
The producer calls this if it gets an error sending messages. This essentially locks down this topic for 1 minute (for all producers) and allows the caller to continue to the next topic.
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/deimos/kafka_topic_info.rb', line 63 def register_error(topic, lock_id) record = self.where(topic: topic, locked_by: lock_id).last attr_hash = { locked_by: nil, locked_at: Time.zone.now, error: true, retries: record.retries + 1 } if ActiveRecord::VERSION::MAJOR >= 4 record.update!(attr_hash) else record.update_attributes!(attr_hash) end end |