Class: Deimos::Utils::LagReporter
- Inherits:
-
Object
- Object
- Deimos::Utils::LagReporter
- Extended by:
- Mutex_m
- Defined in:
- lib/deimos/utils/lag_reporter.rb
Overview
Class that manages reporting lag.
Defined Under Namespace
Classes: ConsumerGroup, Topic
Class Method Summary collapse
- .heartbeat(payload) ⇒ Object
- .message_processed(payload) ⇒ Object
- .offset_seek(payload) ⇒ Object
-
.reset ⇒ Object
Reset all group information.
Class Method Details
.heartbeat(payload) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/deimos/utils/lag_reporter.rb', line 139 def heartbeat(payload) group = payload[:group_id] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) consumer_group = @groups[group.to_s] payload[:topic_partitions].each do |topic, partitions| partitions.each do |partition| consumer_group.report_lag(topic, partition) end end end end |
.message_processed(payload) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/deimos/utils/lag_reporter.rb', line 113 def (payload) lag = payload[:offset_lag] topic = payload[:topic] group = payload[:group_id] partition = payload[:partition] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) @groups[group.to_s].assign_lag(topic, partition, lag) end end |
.offset_seek(payload) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/deimos/utils/lag_reporter.rb', line 126 def offset_seek(payload) offset = payload[:offset] topic = payload[:topic] group = payload[:group_id] partition = payload[:partition] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) @groups[group.to_s].compute_lag(topic, partition, offset) end end |
.reset ⇒ Object
Reset all group information.
108 109 110 |
# File 'lib/deimos/utils/lag_reporter.rb', line 108 def reset @groups = {} end |