Class: Fluent::Plugin::JSONStreamInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_jsonstream.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



23
24
25
# File 'lib/fluent/plugin/in_jsonstream.rb', line 23

def configure(conf)
  super
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


27
28
29
# File 'lib/fluent/plugin/in_jsonstream.rb', line 27

def multi_workers_ready?
  true
end

#startObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/in_jsonstream.rb', line 31

def start
  super

  server_create_connection(:in_jsonstream_server, @port, bind: @bind) do |conn|
    parser = Yajl::Parser.new(:symbolize_keys => false)
    parser.on_parse_complete = lambda { |record|
      tag = extract_tag_from_record(record)
      tag ||= @tag
      time ||= extract_time_from_record(record) || Fluent::EventTime.now

      # Use the recorded event time if available
      time = record.delete('timestamp').to_i if record.key?('timestamp')

      router.emit(tag, time, record)
    }

    conn.data do |data|
      parser << data
    end
  end
end