Class: Deimos::Utils::LagReporter

Inherits:
Object
  • Object
show all
Extended by:
Mutex_m
Defined in:
lib/deimos/utils/lag_reporter.rb

Overview

Class that manages reporting lag.

Defined Under Namespace

Classes: ConsumerGroup, Topic

Class Method Summary collapse

Class Method Details

.heartbeat(payload) ⇒ Object

Parameters:

  • payload (Hash)


132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/deimos/utils/lag_reporter.rb', line 132

def heartbeat(payload)
  group = payload[:group_id]
  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    consumer_group = @groups[group.to_s]
    payload[:topic_partitions].each do |topic, partitions|
      partitions.each do |partition|
        consumer_group.report_lag(topic, partition)
      end
    end
  end
end

.message_processed(payload) ⇒ Object

offset_lag = event.payload.fetch(:offset_lag) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition)

Parameters:

  • payload (Hash)


106
107
108
109
110
111
112
113
114
115
116
# File 'lib/deimos/utils/lag_reporter.rb', line 106

def message_processed(payload)
  offset = payload[:offset] || payload[:last_offset]
  topic = payload[:topic]
  group = payload[:group_id]
  partition = payload[:partition]

  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    @groups[group.to_s].assign_current_offset(topic, partition, offset)
  end
end

.offset_seek(payload) ⇒ Object

Parameters:

  • payload (Hash)


119
120
121
122
123
124
125
126
127
128
129
# File 'lib/deimos/utils/lag_reporter.rb', line 119

def offset_seek(payload)
  offset = payload[:offset]
  topic = payload[:topic]
  group = payload[:group_id]
  partition = payload[:partition]

  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    @groups[group.to_s].assign_current_offset(topic, partition, offset)
  end
end

.resetObject

Reset all group information.



97
98
99
# File 'lib/deimos/utils/lag_reporter.rb', line 97

def reset
  @groups = {}
end