Class: FileWatch::ReadMode::Processor
- Inherits:
-
Object
- Object
- FileWatch::ReadMode::Processor
- Includes:
- LogStash::Util::Loggable
- Defined in:
- lib/filewatch/read_mode/processor.rb
Overview
Must handle
:read_file
:read_zip_file
Instance Attribute Summary collapse
-
#deletable_filepaths ⇒ Object
readonly
Returns the value of attribute deletable_filepaths.
-
#watch ⇒ Object
readonly
Returns the value of attribute watch.
Instance Method Summary collapse
- #add_watch(watch) ⇒ Object
- #common_deleted_reaction(watched_file, action) ⇒ Object
- #common_error_reaction(path, error, action) ⇒ Object
-
#initialize(settings) ⇒ Processor
constructor
A new instance of Processor.
- #initialize_handlers(sincedb_collection, observer) ⇒ Object
- #process_active(watched_files) ⇒ Object
- #process_closed(watched_files) ⇒ Object
- #process_ignored(watched_files) ⇒ Object
- #process_watched(watched_files) ⇒ Object
- #read_file(watched_file) ⇒ Object
- #read_zip_file(watched_file) ⇒ Object
Constructor Details
#initialize(settings) ⇒ Processor
Returns a new instance of Processor.
17 18 19 20 |
# File 'lib/filewatch/read_mode/processor.rb', line 17 def initialize(settings) @settings = settings @deletable_filepaths = [] end |
Instance Attribute Details
#deletable_filepaths ⇒ Object (readonly)
Returns the value of attribute deletable_filepaths.
15 16 17 |
# File 'lib/filewatch/read_mode/processor.rb', line 15 def deletable_filepaths @deletable_filepaths end |
#watch ⇒ Object (readonly)
Returns the value of attribute watch.
15 16 17 |
# File 'lib/filewatch/read_mode/processor.rb', line 15 def watch @watch end |
Instance Method Details
#add_watch(watch) ⇒ Object
22 23 24 25 |
# File 'lib/filewatch/read_mode/processor.rb', line 22 def add_watch(watch) @watch = watch self end |
#common_deleted_reaction(watched_file, action) ⇒ Object
106 107 108 109 110 111 |
# File 'lib/filewatch/read_mode/processor.rb', line 106 def common_deleted_reaction(watched_file, action) # file has gone away or we can't read it anymore. watched_file.unwatch deletable_filepaths << watched_file.path logger.debug("#{action} - stat failed: #{watched_file.path}, removing from collection") end |
#common_error_reaction(path, error, action) ⇒ Object
113 114 115 |
# File 'lib/filewatch/read_mode/processor.rb', line 113 def common_error_reaction(path, error, action) logger.error("#{action} - other error #{path}: (#{error.}, #{error.backtrace.take(8).inspect})") end |
#initialize_handlers(sincedb_collection, observer) ⇒ Object
27 28 29 30 |
# File 'lib/filewatch/read_mode/processor.rb', line 27 def initialize_handlers(sincedb_collection, observer) @read_file = Handlers::ReadFile.new(sincedb_collection, observer, @settings) @read_zip_file = Handlers::ReadZipFile.new(sincedb_collection, observer, @settings) end |
#process_active(watched_files) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/filewatch/read_mode/processor.rb', line 81 def process_active(watched_files) logger.debug("Active processing") # Handles watched_files in the active state. watched_files.select {|wf| wf.active? }.each do |watched_file| path = watched_file.path begin watched_file.restat rescue Errno::ENOENT common_deleted_reaction(watched_file, "Active") next rescue => e common_error_reaction(path, e, "Active") next end break if watch.quit? if watched_file.compressed? read_zip_file(watched_file) else read_file(watched_file) end # handlers take care of closing and unwatching end end |
#process_closed(watched_files) ⇒ Object
40 41 42 |
# File 'lib/filewatch/read_mode/processor.rb', line 40 def process_closed(watched_files) # do not process watched_files in the closed state. end |
#process_ignored(watched_files) ⇒ Object
44 45 46 |
# File 'lib/filewatch/read_mode/processor.rb', line 44 def process_ignored(watched_files) # do not process watched_files in the ignored state. end |
#process_watched(watched_files) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/filewatch/read_mode/processor.rb', line 48 def process_watched(watched_files) logger.debug("Watched processing") # Handles watched_files in the watched state. # for a slice of them: # move to the active state # should never have been active before # how much of the max active window is available to_take = @settings.max_active - watched_files.count{|wf| wf.active?} if to_take > 0 watched_files.select {|wf| wf.watched?}.take(to_take).each do |watched_file| path = watched_file.path begin watched_file.restat watched_file.activate rescue Errno::ENOENT common_deleted_reaction(watched_file, "Watched") next rescue => e common_error_reaction(path, e, "Watched") next end break if watch.quit? end else now = Time.now.to_i if (now - watch.lastwarn_max_files) > MAX_FILES_WARN_INTERVAL waiting = watched_files.size - @settings.max_active logger.warn(@settings.max_warn_msg + ", files yet to open: #{waiting}") watch.lastwarn_max_files = now end end end |
#read_file(watched_file) ⇒ Object
32 33 34 |
# File 'lib/filewatch/read_mode/processor.rb', line 32 def read_file(watched_file) @read_file.handle(watched_file) end |
#read_zip_file(watched_file) ⇒ Object
36 37 38 |
# File 'lib/filewatch/read_mode/processor.rb', line 36 def read_zip_file(watched_file) @read_zip_file.handle(watched_file) end |