Class: LogStash::Inputs::File
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::File
- Defined in:
- lib/logstash/inputs/file.rb
Instance Attribute Summary collapse
- #watcher ⇒ Object readonly
Class Method Summary collapse
Instance Method Summary collapse
-
#completely_stopped? ⇒ Boolean
def register.
- #handle_deletable_path(path) ⇒ Object
-
#listener_for(path) ⇒ Object
The WatchedFile calls back here as ‘observer.listener_for(@path)`.
- #log_line_received(path, line) ⇒ Object
-
#post_process_this(event) ⇒ Object
def run.
- #queue ⇒ Object
- #register ⇒ Object
- #run(queue) ⇒ Object
- #start_processing ⇒ Object
- #stop ⇒ Object
Instance Attribute Details
#watcher ⇒ Object (readonly)
252 253 254 |
# File 'lib/logstash/inputs/file.rb', line 252 def watcher @watcher end |
Class Method Details
.old_validate_value ⇒ Object
239 |
# File 'lib/logstash/inputs/file.rb', line 239 alias_method :old_validate_value, :validate_value |
.validate_value(value, validator) ⇒ Object
241 242 243 244 245 246 247 248 |
# File 'lib/logstash/inputs/file.rb', line 241 def validate_value(value, validator) if validator.is_a?(Array) && validator.size == 2 && validator.first.respond_to?(:call) callable, units = *validator # returns a ValidatedStruct having a `to_a` method suitable to return to the config mixin caller return callable.call(value, units).to_a end old_validate_value(value, validator) end |
Instance Method Details
#completely_stopped? ⇒ Boolean
def register
330 331 332 333 |
# File 'lib/logstash/inputs/file.rb', line 330 def completely_stopped? # to synchronise after(:each) blocks in tests that remove the sincedb file before atomic_write completes @completely_stopped.true? end |
#handle_deletable_path(path) ⇒ Object
377 378 379 380 381 382 |
# File 'lib/logstash/inputs/file.rb', line 377 def handle_deletable_path(path) return if tail_mode? return if @completed_file_handlers.empty? @logger.debug? && @logger.debug(__method__.to_s, :path => path) @completed_file_handlers.each { |handler| handler.handle(path) } end |
#listener_for(path) ⇒ Object
The WatchedFile calls back here as ‘observer.listener_for(@path)`
337 338 339 |
# File 'lib/logstash/inputs/file.rb', line 337 def listener_for(path) FileListener.new(path, self) end |
#log_line_received(path, line) ⇒ Object
384 385 386 |
# File 'lib/logstash/inputs/file.rb', line 384 def log_line_received(path, line) @logger.debug? && @logger.debug("Received line", :path => path, :text => line) end |
#post_process_this(event) ⇒ Object
def run
370 371 372 373 374 375 |
# File 'lib/logstash/inputs/file.rb', line 370 def post_process_this(event) event.set("[@metadata][host]", @host) event.set("host", @host) unless event.include?("host") decorate(event) @queue.get << event end |
#queue ⇒ Object
396 397 398 |
# File 'lib/logstash/inputs/file.rb', line 396 def queue @queue.get end |
#register ⇒ Object
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
# File 'lib/logstash/inputs/file.rb', line 254 def register require "addressable/uri" require "digest/md5" @logger.trace("Registering file input", :path => @path) @host = Socket.gethostname.force_encoding(Encoding::UTF_8) # This check is Logstash 5 specific. If the class does not exist, and it # won't in older versions of Logstash, then we need to set it to nil. settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil @filewatch_config = { :exclude => @exclude, :stat_interval => @stat_interval, :discover_interval => @discover_interval, :sincedb_write_interval => @sincedb_write_interval, :delimiter => @delimiter, :ignore_older => @ignore_older, :close_older => @close_older, :max_open_files => @max_open_files, :sincedb_clean_after => @sincedb_clean_after, :file_chunk_count => @file_chunk_count, :file_chunk_size => @file_chunk_size, :file_sort_by => @file_sort_by, :file_sort_direction => @file_sort_direction, :exit_after_read => @exit_after_read, :check_archive_validity => @check_archive_validity, } @path.each do |path| if Pathname.new(path).relative? raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}") end end if @sincedb_path.nil? base_sincedb_path = build_sincedb_base_from_settings(settings) || build_sincedb_base_from_env @sincedb_path = build_random_sincedb_filename(base_sincedb_path) @logger.info('No sincedb_path set, generating one based on the "path" setting', :sincedb_path => @sincedb_path.to_s, :path => @path) else @sincedb_path = Pathname.new(@sincedb_path) if @sincedb_path.directory? raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") end end @filewatch_config[:sincedb_path] = @sincedb_path @filewatch_config[:start_new_files_at] = @start_position.to_sym if @file_completed_action.include?('log') if @file_completed_log_path.nil? raise ArgumentError.new('The "file_completed_log_path" setting must be provided when the "file_completed_action" is set to "log" or "log_and_delete"') else @file_completed_log_path = Pathname.new(@file_completed_log_path) unless @file_completed_log_path.exist? begin FileUtils.touch(@file_completed_log_path) rescue raise ArgumentError.new("The \"file_completed_log_path\" file can't be created: #{@file_completed_log_path}") end end end end if tail_mode? if @exit_after_read raise ArgumentError.new('The "exit_after_read" setting only works when the "mode" is set to "read"') end @watcher_class = FileWatch::ObservingTail else @watcher_class = FileWatch::ObservingRead end @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) @completely_stopped = Concurrent::AtomicBoolean.new @queue = Concurrent::AtomicReference.new end |
#run(queue) ⇒ Object
361 362 363 364 365 366 367 368 |
# File 'lib/logstash/inputs/file.rb', line 361 def run(queue) start_processing @queue.set queue @watcher.subscribe(self) # halts here until quit is called # last action of the subscribe call is to write the sincedb exit_flush @completely_stopped.make_true end |
#start_processing ⇒ Object
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 |
# File 'lib/logstash/inputs/file.rb', line 341 def start_processing # if the pipeline restarts this input, # make sure previous files are closed stop @watcher = @watcher_class.new(@filewatch_config) @completed_file_handlers = [] if read_mode? if @file_completed_action.include?('log') @completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path) end if @file_completed_action.include?('delete') @completed_file_handlers << DeleteCompletedFileHandler.new(@watcher.watch) end end @path.each { |path| @watcher.watch_this(path) } end |
#stop ⇒ Object
388 389 390 391 392 393 |
# File 'lib/logstash/inputs/file.rb', line 388 def stop unless @watcher.nil? @codec.close @watcher.quit end end |