Class: Fluent::Plugin::UtmpxInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_utmpx.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/in_utmpx.rb', line 39

def configure(conf)
  @variable_store = Fluent::VariableStore.fetch_or_build(:in_utmpx)
  super
  @utmpx = Linux::Utmpx::UtmpxParser.new
  @buffer = ""
  @tail_position = 0
  @previous_position = 0

  if @variable_store.key?(@pos_file) && !called_in_test?
    plugin_id_using_this_path = @variable_store[@pos_file]
    raise Fluent::ConfigError, "Other 'in_utmpx' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}"
  end
  @variable_store[@pos_file] = self.plugin_id
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/fluent/plugin/in_utmpx.rb', line 96

def multi_workers_ready?
  false
end

#refresh_watchersObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/in_utmpx.rb', line 67

def refresh_watchers
  @tail_position = Fluent::FileWrapper.stat(@path).size
  @pe = @pf[TailInput::TargetInfo.new(@path, Fluent::FileWrapper.stat(@path).ino)]
  return if (@tail_position - @pe.read_pos) == 0

  if (@tail_position - @pe.read_pos) < 0
    # may be truncated, read from head
    @pe.update_pos(0)
    log.warn("#{@path} may be truncated")
    return
  end

  count = (@tail_position - @pe.read_pos) / @utmpx.num_bytes
  es = MultiEventStream.new
  File.open(@path) do |io|
    io.seek(@pe.read_pos)
    count.times do |n|
      time, record = parse_entry(@utmpx.read(io))
      es.add(time,record)
    end
    @pe.update_pos(@pe.read_pos + count * @utmpx.num_bytes)
    router.emit_stream(@tag, es)
  end
end

#shutdownObject



92
93
94
# File 'lib/fluent/plugin/in_utmpx.rb', line 92

def shutdown
  @pf_file.close if @pf_file
end

#startObject



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/in_utmpx.rb', line 54

def start
  super

  pos_file_dir = File.dirname(@pos_file)
  FileUtils.mkdir_p(pos_file_dir, mode: Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(pos_file_dir)
  @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, Fluent::DEFAULT_FILE_PERMISSION)
  @pf_file.sync = true
  target_info = TailInput::TargetInfo.new(@path, Fluent::FileWrapper.stat(@path).ino)
  @pf = TailInput::PositionFile.load(@pf_file, false, {target_info.path => target_info}, logger: log)

  timer_execute(:execute_utmpx, @interval, &method(:refresh_watchers))
end