Class: Fluent::Plugin::UnixClientInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_unix_client.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



37
38
39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/in_unix_client.rb', line 37

def configure(conf)
  super
  @parser = parser_create
  @socket_handler = SocketHandler.new(
    @path,
    delimiter: @delimiter,
    format_json: @format_json,
    log: log,
  )
end

#emit_one_parsed(time, record) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/in_unix_client.rb', line 77

def emit_one_parsed(time, record)
  case record
  when Array
    es = Fluent::MultiEventStream.new
    record.each do |e|
      es.add(time, e)
    end
    router.emit_stream(@tag, es)
  else
    router.emit(@tag, time, record)
  end
end

#keep_receivingObject



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/in_unix_client.rb', line 53

def keep_receiving
  while thread_current_running?
    begin
      receive_and_emit
    rescue => e
      log.error "in_unix_client: error occurred. #{e}"
      sleep 3
    end
  end
ensure
  @socket_handler.try_close
end

#receive_and_emitObject



66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/in_unix_client.rb', line 66

def receive_and_emit
  raw_records = @socket_handler.try_receive
  return if raw_records.nil? || raw_records.empty?

  raw_records.each do |raw_record|
    @parser.parse(raw_record) do |time, record|
      emit_one_parsed(time, record)
    end
  end
end

#startObject



48
49
50
51
# File 'lib/fluent/plugin/in_unix_client.rb', line 48

def start
  super
  thread_create(:in_unix_client, &method(:keep_receiving))
end