Class: Fluent::Plugin::UtmpxInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::UtmpxInput
- Defined in:
- lib/fluent/plugin/in_utmpx.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #refresh_watchers ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/fluent/plugin/in_utmpx.rb', line 39 def configure(conf) @variable_store = Fluent::VariableStore.fetch_or_build(:in_utmpx) super @utmpx = Linux::Utmpx::UtmpxParser.new @buffer = "" @tail_position = 0 @previous_position = 0 if @variable_store.key?(@pos_file) && !called_in_test? plugin_id_using_this_path = @variable_store[@pos_file] raise Fluent::ConfigError, "Other 'in_utmpx' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}" end @variable_store[@pos_file] = self.plugin_id end |
#multi_workers_ready? ⇒ Boolean
96 97 98 |
# File 'lib/fluent/plugin/in_utmpx.rb', line 96 def multi_workers_ready? false end |
#refresh_watchers ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/fluent/plugin/in_utmpx.rb', line 67 def refresh_watchers @tail_position = Fluent::FileWrapper.stat(@path).size @pe = @pf[TailInput::TargetInfo.new(@path, Fluent::FileWrapper.stat(@path).ino)] return if (@tail_position - @pe.read_pos) == 0 if (@tail_position - @pe.read_pos) < 0 # may be truncated, read from head @pe.update_pos(0) log.warn("#{@path} may be truncated") return end count = (@tail_position - @pe.read_pos) / @utmpx.num_bytes es = MultiEventStream.new File.open(@path) do |io| io.seek(@pe.read_pos) count.times do |n| time, record = parse_entry(@utmpx.read(io)) es.add(time,record) end @pe.update_pos(@pe.read_pos + count * @utmpx.num_bytes) router.emit_stream(@tag, es) end end |
#shutdown ⇒ Object
92 93 94 |
# File 'lib/fluent/plugin/in_utmpx.rb', line 92 def shutdown @pf_file.close if @pf_file end |
#start ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/in_utmpx.rb', line 54 def start super pos_file_dir = File.dirname(@pos_file) FileUtils.mkdir_p(pos_file_dir, mode: Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, Fluent::DEFAULT_FILE_PERMISSION) @pf_file.sync = true target_info = TailInput::TargetInfo.new(@path, Fluent::FileWrapper.stat(@path).ino) @pf = TailInput::PositionFile.load(@pf_file, false, {target_info.path => target_info}, logger: log) timer_execute(:execute_utmpx, @interval, &method(:refresh_watchers)) end |