Class: Fluent::Plugin::UtmpxInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::UtmpxInput
- Defined in:
- lib/fluent/plugin/in_utmpx.rb
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/in_utmpx.rb', line 43 def configure(conf) @variable_store = Fluent::VariableStore.fetch_or_build(:in_utmpx) super @utmpx = Linux::Utmpx::UtmpxParser.new @buffer = "" @tail_position = 0 @previous_position = 0 if @variable_store.key?(@pos_file) && !called_in_test? plugin_id_using_this_path = @variable_store[@pos_file] raise Fluent::ConfigError, "Other 'in_utmpx' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}" end @variable_store[@pos_file] = self.plugin_id end |
#refresh_watchers ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/fluent/plugin/in_utmpx.rb', line 75 def refresh_watchers @tail_position = Fluent::FileWrapper.stat(@path).size if Gem::Version.new(Fluent::VERSION) < Gem::Version.new("1.12.0") @pe = @pf[@path] else @pe = @pf[TargetInfo.new(@path, Fluent::FileWrapper.stat(@path).ino)] end return if (@tail_position - @pe.read_pos) == 0 if (@tail_position - @pe.read_pos) < 0 # may be truncated, read from head @pe.update_pos(0) log.warn("#{@path} may be truncated") return end count = (@tail_position - @pe.read_pos) / @utmpx.num_bytes es = MultiEventStream.new File.open(@path) do |io| io.seek(@pe.read_pos) count.times do |n| time, record = parse_entry(@utmpx.read(io)) es.add(time,record) end @pe.update_pos(@pe.read_pos + count * @utmpx.num_bytes) router.emit_stream(@tag, es) end end |
#shutdown ⇒ Object
104 105 106 |
# File 'lib/fluent/plugin/in_utmpx.rb', line 104 def shutdown @pf_file.close if @pf_file end |
#start ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/fluent/plugin/in_utmpx.rb', line 58 def start super pos_file_dir = File.dirname(@pos_file) FileUtils.mkdir_p(pos_file_dir, mode: Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, Fluent::DEFAULT_FILE_PERMISSION) @pf_file.sync = true target_info = TargetInfo.new(@path, Fluent::FileWrapper.stat(@path).ino) if Gem::Version.new(Fluent::VERSION) < Gem::Version.new("1.12.0") @pf = TailInput::PositionFile.load(@pf_file, logger: log) else @pf = TailInput::PositionFile.load(@pf_file, false, {target_info.path => target_info}, logger: log) end timer_execute(:execute_utmpx, @interval, &method(:refresh_watchers)) end |