Class: Deimos::Utils::LagReporter::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/lag_reporter.rb

Overview

Topic which has a hash of partition => last known current offsets

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic_name, group) ⇒ Topic

Returns a new instance of Topic.

Parameters:



52
53
54
55
56
# File 'lib/deimos/utils/lag_reporter.rb', line 52

def initialize(topic_name, group)
  self.topic_name = topic_name
  self.consumer_group = group
  self.partition_current_offsets = {}
end

Instance Attribute Details

#consumer_groupConsumerGroup

Returns:



48
49
50
# File 'lib/deimos/utils/lag_reporter.rb', line 48

def consumer_group
  @consumer_group
end

#partition_current_offsetsHash<Integer, Integer>

Returns:

  • (Hash<Integer, Integer>)


46
47
48
# File 'lib/deimos/utils/lag_reporter.rb', line 46

def partition_current_offsets
  @partition_current_offsets
end

#topic_nameString

Returns:

  • (String)


44
45
46
# File 'lib/deimos/utils/lag_reporter.rb', line 44

def topic_name
  @topic_name
end

Instance Method Details

#assign_current_offset(partition, offset) ⇒ Object

Parameters:

  • partition (Integer)


59
60
61
# File 'lib/deimos/utils/lag_reporter.rb', line 59

def assign_current_offset(partition, offset)
  self.partition_current_offsets[partition.to_i] = offset
end

#compute_lag(partition, offset) ⇒ Object

Parameters:

  • partition (Integer)


64
65
66
67
68
69
70
71
72
73
74
# File 'lib/deimos/utils/lag_reporter.rb', line 64

def compute_lag(partition, offset)
  begin
    client = Phobos.create_kafka_client
    last_offset = client.last_offset_for(self.topic_name, partition)
    lag = last_offset - offset
  rescue StandardError # don't do anything, just wait
    Deimos.config.logger.
      debug("Error computing lag for #{self.topic_name}, will retry")
  end
  lag || 0
end

#report_lag(partition) ⇒ Object

Parameters:

  • partition (Integer)


77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/deimos/utils/lag_reporter.rb', line 77

def report_lag(partition)
  current_offset = self.partition_current_offsets[partition.to_i]
  return unless current_offset

  lag = compute_lag(partition, current_offset)
  group = self.consumer_group.id
  Deimos.config.logger.
    debug("Sending lag: #{group}/#{partition}: #{lag}")
  Deimos.config.metrics&.gauge('consumer_lag', lag, tags: %W(
                                 consumer_group:#{group}
                                 partition:#{partition}
                                 topic:#{self.topic_name}
                               ))
end