Class: Fluent::Plugin::UnixClientInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::UnixClientInput
- Defined in:
- lib/fluent/plugin/in_unix_client.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit_one_parsed(time, record) ⇒ Object
- #keep_receiving ⇒ Object
- #receive_and_emit ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/in_unix_client.rb', line 37 def configure(conf) super @parser = parser_create @socket_handler = SocketHandler.new( @path, delimiter: @delimiter, format_json: @format_json, log: log, ) end |
#emit_one_parsed(time, record) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/fluent/plugin/in_unix_client.rb', line 77 def emit_one_parsed(time, record) case record when Array es = Fluent::MultiEventStream.new record.each do |e| es.add(time, e) end router.emit_stream(@tag, es) else router.emit(@tag, time, record) end end |
#keep_receiving ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/in_unix_client.rb', line 53 def keep_receiving while thread_current_running? begin receive_and_emit rescue => e log.error "in_unix_client: error occurred. #{e}" sleep 3 end end ensure @socket_handler.try_close end |
#receive_and_emit ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/in_unix_client.rb', line 66 def receive_and_emit raw_records = @socket_handler.try_receive return if raw_records.nil? || raw_records.empty? raw_records.each do |raw_record| @parser.parse(raw_record) do |time, record| emit_one_parsed(time, record) end end end |
#start ⇒ Object
48 49 50 51 |
# File 'lib/fluent/plugin/in_unix_client.rb', line 48 def start super thread_create(:in_unix_client, &method(:keep_receiving)) end |