Class: Deimos::Utils::LagReporter
- Inherits:
-
Object
- Object
- Deimos::Utils::LagReporter
- Extended by:
- Mutex_m
- Defined in:
- lib/deimos/utils/lag_reporter.rb
Overview
Class that manages reporting lag.
Defined Under Namespace
Classes: ConsumerGroup, Topic
Class Method Summary collapse
- .heartbeat(payload) ⇒ void
-
.message_processed(payload) ⇒ void
offset_lag = event.payload.fetch(:offset_lag) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition).
- .offset_seek(payload) ⇒ void
-
.reset ⇒ void
Reset all group information.
Class Method Details
.heartbeat(payload) ⇒ void
This method returns an undefined value.
143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/deimos/utils/lag_reporter.rb', line 143 def heartbeat(payload) group = payload[:group_id] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) consumer_group = @groups[group.to_s] payload[:topic_partitions].each do |topic, partitions| partitions.each do |partition| consumer_group.report_lag(topic, partition) end end end end |
.message_processed(payload) ⇒ void
This method returns an undefined value.
offset_lag = event.payload.fetch(:offset_lag) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition)
115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/deimos/utils/lag_reporter.rb', line 115 def (payload) offset = payload[:offset] || payload[:last_offset] topic = payload[:topic] group = payload[:group_id] partition = payload[:partition] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) @groups[group.to_s].assign_current_offset(topic, partition, offset) end end |
.offset_seek(payload) ⇒ void
This method returns an undefined value.
129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/deimos/utils/lag_reporter.rb', line 129 def offset_seek(payload) offset = payload[:offset] topic = payload[:topic] group = payload[:group_id] partition = payload[:partition] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) @groups[group.to_s].assign_current_offset(topic, partition, offset) end end |
.reset ⇒ void
This method returns an undefined value.
Reset all group information.
105 106 107 |
# File 'lib/deimos/utils/lag_reporter.rb', line 105 def reset @groups = {} end |