Class: Deimos::Utils::LagReporter

Inherits:
Object
  • Object
show all
Extended by:
Mutex_m
Defined in:
lib/deimos/utils/lag_reporter.rb

Overview

Class that manages reporting lag.

Defined Under Namespace

Classes: ConsumerGroup, Topic

Class Method Summary collapse

Class Method Details

.heartbeat(payload) ⇒ void

This method returns an undefined value.

Parameters:

  • payload (Hash)


143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/deimos/utils/lag_reporter.rb', line 143

def heartbeat(payload)
  group = payload[:group_id]
  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    consumer_group = @groups[group.to_s]
    payload[:topic_partitions].each do |topic, partitions|
      partitions.each do |partition|
        consumer_group.report_lag(topic, partition)
      end
    end
  end
end

.message_processed(payload) ⇒ void

This method returns an undefined value.

offset_lag = event.payload.fetch(:offset_lag) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition)

Parameters:

  • payload (Hash)


115
116
117
118
119
120
121
122
123
124
125
# File 'lib/deimos/utils/lag_reporter.rb', line 115

def message_processed(payload)
  offset = payload[:offset] || payload[:last_offset]
  topic = payload[:topic]
  group = payload[:group_id]
  partition = payload[:partition]

  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    @groups[group.to_s].assign_current_offset(topic, partition, offset)
  end
end

.offset_seek(payload) ⇒ void

This method returns an undefined value.

Parameters:

  • payload (Hash)


129
130
131
132
133
134
135
136
137
138
139
# File 'lib/deimos/utils/lag_reporter.rb', line 129

def offset_seek(payload)
  offset = payload[:offset]
  topic = payload[:topic]
  group = payload[:group_id]
  partition = payload[:partition]

  synchronize do
    @groups[group.to_s] ||= ConsumerGroup.new(group)
    @groups[group.to_s].assign_current_offset(topic, partition, offset)
  end
end

.resetvoid

This method returns an undefined value.

Reset all group information.



105
106
107
# File 'lib/deimos/utils/lag_reporter.rb', line 105

def reset
  @groups = {}
end