Class: Fluent::KafkaGroupInput
Defined Under Namespace
Classes: ForShutdown
Constant Summary
collapse
- BufferError =
if defined?(Fluent::Plugin::Buffer::BufferOverflowError)
Fluent::Plugin::Buffer::BufferOverflowError
else
Fluent::BufferQueueLimitError
end
Instance Method Summary
collapse
included
included, #pickup_ssl_endpoint, #read_ssl_file
Constructor Details
Returns a new instance of KafkaGroupInput.
65
66
67
68
69
70
|
# File 'lib/fluent/plugin/in_kafka_group.rb', line 65
def initialize
super
require 'kafka'
@time_parser = nil
end
|
Instance Method Details
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/fluent/plugin/in_kafka_group.rb', line 86
def configure(conf)
super
$log.info "Will watch for topics #{@topics} at brokers " \
"#{@brokers} and '#{@consumer_group}' group"
@topics = _config_to_array(@topics)
if conf['max_wait_ms']
log.warn "'max_wait_ms' parameter is deprecated. Use second unit 'max_wait_time' instead"
@max_wait_time = conf['max_wait_ms'].to_i / 1000
end
@parser_proc = setup_parser
@consumer_opts = {:group_id => @consumer_group}
@consumer_opts[:session_timeout] = @session_timeout if @session_timeout
@consumer_opts[:offset_commit_interval] = @offset_commit_interval if @offset_commit_interval
@consumer_opts[:offset_commit_threshold] = @offset_commit_threshold if @offset_commit_threshold
@fetch_opts = {}
@fetch_opts[:max_wait_time] = @max_wait_time if @max_wait_time
@fetch_opts[:min_bytes] = @min_bytes if @min_bytes
if @use_record_time and @time_format
if defined?(Fluent::TimeParser)
@time_parser = Fluent::TimeParser.new(@time_format)
else
@time_parser = Fluent::TextParser::TimeParser.new(@time_format)
end
end
end
|
#emit_events(tag, es) ⇒ Object
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
|
# File 'lib/fluent/plugin/in_kafka_group.rb', line 235
def emit_events(tag, es)
retries = 0
begin
router.emit_stream(tag, es)
rescue BufferError
raise ForShutdown if @consumer.nil?
if @retry_emit_limit.nil?
sleep 1
retry
end
if retries < @retry_emit_limit
retries += 1
sleep 1
retry
else
raise RuntimeError, "Exceeds retry_emit_limit"
end
end
end
|
#multi_workers_ready? ⇒ Boolean
80
81
82
|
# File 'lib/fluent/plugin/in_kafka_group.rb', line 80
def multi_workers_ready?
true
end
|
#reconnect_consumer ⇒ Object
174
175
176
177
178
179
180
181
182
183
184
185
186
|
# File 'lib/fluent/plugin/in_kafka_group.rb', line 174
def reconnect_consumer
log.warn "Stopping Consumer"
consumer = @consumer
@consumer = nil
consumer.stop
log.warn "Could not connect to broker. Next retry will be in #{@retry_wait_seconds} seconds"
sleep @retry_wait_seconds
@consumer = setup_consumer
log.warn "Re-starting consumer #{Time.now.to_s}"
rescue =>e
log.error "unexpected error during re-starting consumer object access", :error => e.to_s
log.error_backtrace
end
|
#run ⇒ Object
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
|
# File 'lib/fluent/plugin/in_kafka_group.rb', line 188
def run
while @consumer
begin
@consumer.each_batch(@fetch_opts) { |batch|
es = Fluent::MultiEventStream.new
tag = batch.topic
tag = @add_prefix + "." + tag if @add_prefix
tag = tag + "." + @add_suffix if @add_suffix
batch.messages.each { |msg|
begin
record = @parser_proc.call(msg)
if @use_record_time
if @time_format
record_time = @time_parser.parse(record['time'])
else
record_time = record['time']
end
else
record_time = Fluent::Engine.now
end
if @kafka_message_key
record[@kafka_message_key] = msg.key
end
es.add(record_time, record)
rescue => e
log.warn "parser error in #{batch.topic}/#{batch.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset
log.debug_backtrace
end
}
unless es.empty?
emit_events(tag, es)
end
}
rescue ForShutdown
rescue => e
log.error "unexpected error during consuming events from kafka. Re-fetch events.", :error => e.to_s
log.error_backtrace
reconnect_consumer
end
end
rescue => e
log.error "unexpected error during consumer object access", :error => e.to_s
log.error_backtrace
end
|
#setup_consumer ⇒ Object
166
167
168
169
170
171
172
|
# File 'lib/fluent/plugin/in_kafka_group.rb', line 166
def setup_consumer
consumer = @kafka.consumer(@consumer_opts)
@topics.each { |topic|
consumer.subscribe(topic, start_from_beginning: @start_from_beginning, max_bytes_per_partition: @max_bytes)
}
consumer
end
|
#setup_parser ⇒ Object
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/fluent/plugin/in_kafka_group.rb', line 119
def setup_parser
case @format
when 'json'
begin
require 'oj'
Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
Proc.new { |msg| Oj.load(msg.value) }
rescue LoadError
require 'yajl'
Proc.new { |msg| Yajl::Parser.parse(msg.value) }
end
when 'ltsv'
require 'ltsv'
Proc.new { |msg| LTSV.parse(msg.value, {:symbolize_keys => false}).first }
when 'msgpack'
require 'msgpack'
Proc.new { |msg| MessagePack.unpack(msg.value) }
when 'text'
Proc.new { |msg| {@message_key => msg.value} }
end
end
|
#shutdown ⇒ Object
153
154
155
156
157
158
159
160
161
162
163
164
|
# File 'lib/fluent/plugin/in_kafka_group.rb', line 153
def shutdown
consumer = @consumer
@consumer = nil
consumer.stop
@thread.join
@kafka.close
super
end
|
#start ⇒ Object
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/fluent/plugin/in_kafka_group.rb', line 141
def start
super
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id,
ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
ssl_client_cert: read_ssl_file(@ssl_client_cert),
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
@consumer = setup_consumer
@thread = Thread.new(&method(:run))
end
|