Class: Fluent::KafkaInput::TopicWatcher

Inherits:
Coolio::TimerWatcher
  • Object
show all
Defined in:
lib/fluent/plugin/in_kafka.rb

Instance Method Summary collapse

Constructor Details

#initialize(topic_entry, host, port, client_id, interval, format, message_key, add_offset_in_record, add_prefix, add_suffix, offset_manager, router, options = {}) ⇒ TopicWatcher

Returns a new instance of TopicWatcher.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/fluent/plugin/in_kafka.rb', line 131

def initialize(topic_entry, host, port, client_id, interval, format, message_key, add_offset_in_record, add_prefix, add_suffix, offset_manager, router, options={})
  @topic_entry = topic_entry
  @host = host
  @port = port
  @client_id = client_id
  @callback = method(:consume)
  @format = format
  @message_key = message_key
  @add_offset_in_record = add_offset_in_record
  @add_prefix = add_prefix
  @add_suffix = add_suffix
  @options = options
  @offset_manager = offset_manager
  @router = router

  @next_offset = @topic_entry.offset
  if @topic_entry.offset == -1 && offset_manager
    @next_offset = offset_manager.next_offset
  end
  @consumer = create_consumer(@next_offset)      

  super(interval, true)
end

Instance Method Details

#add_offset_in_hash(hash, topic, partition, offset) ⇒ Object



237
238
239
240
241
# File 'lib/fluent/plugin/in_kafka.rb', line 237

def add_offset_in_hash(hash, topic, partition, offset)
  hash['kafka_topic'] = topic
  hash['kafka_partition'] = partition
  hash['kafka_offset'] = offset
end

#consumeObject



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/fluent/plugin/in_kafka.rb', line 163

def consume
  es = MultiEventStream.new
  tag = @topic_entry.topic
  tag = @add_prefix + "." + tag if @add_prefix
  tag = tag + "." + @add_suffix if @add_suffix

  if @offset_manager && @consumer.next_offset != @next_offset
    @consumer = create_consumer(@next_offset)
  end

  @consumer.fetch.each { |msg|
    begin
      msg_record = parse_line(msg.value)
      msg_record = decorate_offset(msg_record, msg.offset) if @add_offset_in_record
      es.add(Engine.now, msg_record)
    rescue
      $log.warn msg_record.to_s, :error=>$!.to_s
      $log.debug_backtrace
    end
  }

  unless es.empty?
    @router.emit_stream(tag, es)

    if @offset_manager
      next_offset = @consumer.next_offset
      @offset_manager.save_offset(next_offset)
      @next_offset = next_offset
    end
  end
end

#create_consumer(offset) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/fluent/plugin/in_kafka.rb', line 195

def create_consumer(offset)
  @consumer.close if @consumer
  Poseidon::PartitionConsumer.new(
    @client_id,             # client_id
    @host,                  # host
    @port,                  # port
    @topic_entry.topic,     # topic
    @topic_entry.partition, # partition
    offset,                 # offset
    @options                # options
  )
end

#decorate_offset(record, offset) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/fluent/plugin/in_kafka.rb', line 221

def decorate_offset(record, offset)
  case @format
  when 'json'
    add_offset_in_hash(record, @topic_entry.topic, @topic_entry.partition, offset)
  when 'ltsv'
    record.each { |line|
      add_offset_in_hash(line, @topic_entry.topic, @topic_entry.partition, offset)
    }
  when 'msgpack'
    add_offset_in_hash(record, @topic_entry.topic, @topic_entry.partition, offset)
  when 'text'
    add_offset_in_hash(record, @topic_entry.topic, @topic_entry.partition, offset)
  end
  record
end

#on_timerObject



155
156
157
158
159
160
161
# File 'lib/fluent/plugin/in_kafka.rb', line 155

def on_timer
  @callback.call
rescue
    # TODO log?
    $log.error $!.to_s
    $log.error_backtrace
end

#parse_line(record) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/fluent/plugin/in_kafka.rb', line 208

def parse_line(record)
  case @format
  when 'json'
    Yajl::Parser.parse(record)
  when 'ltsv'
    LTSV.parse(record)
  when 'msgpack'
    MessagePack.unpack(record)
  when 'text'
    {@message_key => record}
  end
end