Class: Fluent::Plugin::JSONStreamInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::JSONStreamInput
- Defined in:
- lib/fluent/plugin/in_jsonstream.rb
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
23 24 25 |
# File 'lib/fluent/plugin/in_jsonstream.rb', line 23 def configure(conf) super end |
#multi_workers_ready? ⇒ Boolean
27 28 29 |
# File 'lib/fluent/plugin/in_jsonstream.rb', line 27 def multi_workers_ready? true end |
#start ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/in_jsonstream.rb', line 31 def start super server_create_connection(:in_jsonstream_server, @port, bind: @bind) do |conn| parser = Yajl::Parser.new(:symbolize_keys => false) parser.on_parse_complete = lambda { |record| tag = extract_tag_from_record(record) tag ||= @tag time ||= extract_time_from_record(record) || Fluent::EventTime.now # Use the recorded event time if available time = record.delete('timestamp').to_i if record.key?('timestamp') router.emit(tag, time, record) } conn.data do |data| parser << data end end end |