Class: Deimos::Utils::LagReporter

Inherits:
Object
  • Object
show all
Extended by:
Mutex_m
Defined in:
lib/deimos/utils/lag_reporter.rb

Overview

Class that manages reporting lag.

Defined Under Namespace

Classes: ConsumerGroup, Topic

Class Method Summary collapse

Class Method Details

.heartbeat(payload) ⇒ Object

Parameters:

  • payload (Hash)


139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/deimos/utils/lag_reporter.rb', line 139

def heartbeat(payload)
  group = payload[:group_id]
  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    consumer_group = @groups[group.to_s]
    payload[:topic_partitions].each do |topic, partitions|
      partitions.each do |partition|
        consumer_group.report_lag(topic, partition)
      end
    end
  end
end

.message_processed(payload) ⇒ Object

Parameters:

  • payload (Hash)


113
114
115
116
117
118
119
120
121
122
123
# File 'lib/deimos/utils/lag_reporter.rb', line 113

def message_processed(payload)
  lag = payload[:offset_lag]
  topic = payload[:topic]
  group = payload[:group_id]
  partition = payload[:partition]

  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    @groups[group.to_s].assign_lag(topic, partition, lag)
  end
end

.offset_seek(payload) ⇒ Object

Parameters:

  • payload (Hash)


126
127
128
129
130
131
132
133
134
135
136
# File 'lib/deimos/utils/lag_reporter.rb', line 126

def offset_seek(payload)
  offset = payload[:offset]
  topic = payload[:topic]
  group = payload[:group_id]
  partition = payload[:partition]

  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    @groups[group.to_s].compute_lag(topic, partition, offset)
  end
end

.resetObject

Reset all group information.



108
109
110
# File 'lib/deimos/utils/lag_reporter.rb', line 108

def reset
  @groups = {}
end