Class: Fluent::KafkaGroupInput::TopicWatcher

Inherits:
Coolio::TimerWatcher
  • Object
show all
Defined in:
lib/fluent/plugin/in_kafka_group.rb

Instance Method Summary collapse

Constructor Details

#initialize(topic, broker_list, zookeeper_list, consumer_group, interval, format, message_key, add_prefix, add_suffix, router, options) ⇒ TopicWatcher

Returns a new instance of TopicWatcher.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/fluent/plugin/in_kafka_group.rb', line 108

def initialize(topic, broker_list, zookeeper_list, consumer_group,
               interval, format, message_key, add_prefix, add_suffix,
               router, options)
  @topic = topic
  @callback = method(:consume)
  @format = format
  @message_key = message_key
  @add_prefix = add_prefix
  @add_suffix = add_suffix
  @router = router

  @consumer = Poseidon::ConsumerGroup.new(
    consumer_group,
    broker_list,
    zookeeper_list,
    topic,
    options
  )

  super(interval, true)
end

Instance Method Details

#consumeObject



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/fluent/plugin/in_kafka_group.rb', line 138

def consume
  es = MultiEventStream.new
  tag = @topic
  tag = @add_prefix + "." + tag if @add_prefix
  tag = tag + "." + @add_suffix if @add_suffix

  @consumer.fetch do |partition, bulk|
    bulk.each do |msg|
      begin
        msg_record = parse_line(msg.value)
        es.add(Engine.now, msg_record)
      rescue
        $log.warn msg_record.to_s, :error=>$!.to_s
        $log.debug_backtrace
      end
    end
  end

  unless es.empty?
    @router.emit_stream(tag, es)
  end
end

#on_timerObject



130
131
132
133
134
135
136
# File 'lib/fluent/plugin/in_kafka_group.rb', line 130

def on_timer
  @callback.call
rescue
    # TODO log?
    $log.error $!.to_s
    $log.error_backtrace
end

#parse_line(record) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/fluent/plugin/in_kafka_group.rb', line 161

def parse_line(record)
  case @format
  when 'json'
    Yajl::Parser.parse(record)
  when 'ltsv'
    LTSV.parse(record)
  when 'msgpack'
    MessagePack.unpack(record)
  when 'text'
    {@message_key => record}
  end
end