Class: Kafka::Statsd::ConsumerSubscriber

Inherits:
StatsdSubscriber
  • Object
show all
Defined in:
lib/kafka/statsd.rb

Instance Method Summary collapse

Instance Method Details

#join_group(event) ⇒ Object



117
118
119
120
121
122
123
124
125
126
# File 'lib/kafka/statsd.rb', line 117

def join_group(event)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)

  timing("consumer.#{client}.#{group_id}.join_group", event.duration)

  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.join_group.errors")
  end
end

#leave_group(event) ⇒ Object



139
140
141
142
143
144
145
146
147
148
# File 'lib/kafka/statsd.rb', line 139

def leave_group(event)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)

  timing("consumer.#{client}.#{group_id}.leave_group", event.duration)

  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.leave_group.errors")
  end
end

#process_batch(event) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/kafka/statsd.rb', line 99

def process_batch(event)
  lag = event.payload.fetch(:offset_lag)
  messages = event.payload.fetch(:message_count)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)
  topic = event.payload.fetch(:topic)
  partition = event.payload.fetch(:partition)

  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.errors")
  else
    timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.latency", event.duration)
    count("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages", messages)
  end

  gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", lag)
end

#process_message(event) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/kafka/statsd.rb', line 82

def process_message(event)
  lag = event.payload.fetch(:offset_lag)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)
  topic = event.payload.fetch(:topic)
  partition = event.payload.fetch(:partition)

  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_message.errors")
  else
    timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_message.latency", event.duration)
    increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages")
  end

  gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", lag)
end

#sync_group(event) ⇒ Object



128
129
130
131
132
133
134
135
136
137
# File 'lib/kafka/statsd.rb', line 128

def sync_group(event)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)

  timing("consumer.#{client}.#{group_id}.sync_group", event.duration)

  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.sync_group.errors")
  end
end