Class: FileWatch::TailMode::Processor
- Inherits:
-
Object
- Object
- FileWatch::TailMode::Processor
- Includes:
- LogStash::Util::Loggable
- Defined in:
- lib/filewatch/tail_mode/processor.rb
Overview
Must handle
:create_initial - file is discovered and we have no record of it in the sincedb
:create - file is discovered and we have seen it before in the sincedb
:grow - file has more content
:shrink - file has less content
:delete - file can't be read
:timeout - file is closable
:unignore - file was ignored, but have now received new content
Instance Attribute Summary collapse
-
#deletable_filepaths ⇒ Object
readonly
Returns the value of attribute deletable_filepaths.
-
#watch ⇒ Object
readonly
Returns the value of attribute watch.
Instance Method Summary collapse
- #add_watch(watch) ⇒ Object
- #common_deleted_reaction(watched_file, action) ⇒ Object
- #common_error_reaction(path, error, action) ⇒ Object
- #create(watched_file) ⇒ Object
- #create_initial(watched_file) ⇒ Object
- #delete(watched_file) ⇒ Object
- #grow(watched_file) ⇒ Object
-
#initialize(settings) ⇒ Processor
constructor
A new instance of Processor.
- #initialize_handlers(sincedb_collection, observer) ⇒ Object
- #process_active(watched_files) ⇒ Object
- #process_closed(watched_files) ⇒ Object
- #process_ignored(watched_files) ⇒ Object
- #process_watched(watched_files) ⇒ Object
- #shrink(watched_file) ⇒ Object
- #timeout(watched_file) ⇒ Object
- #unignore(watched_file) ⇒ Object
Constructor Details
#initialize(settings) ⇒ Processor
Returns a new instance of Processor.
26 27 28 29 |
# File 'lib/filewatch/tail_mode/processor.rb', line 26 def initialize(settings) @settings = settings @deletable_filepaths = [] end |
Instance Attribute Details
#deletable_filepaths ⇒ Object (readonly)
Returns the value of attribute deletable_filepaths.
24 25 26 |
# File 'lib/filewatch/tail_mode/processor.rb', line 24 def deletable_filepaths @deletable_filepaths end |
#watch ⇒ Object (readonly)
Returns the value of attribute watch.
24 25 26 |
# File 'lib/filewatch/tail_mode/processor.rb', line 24 def watch @watch end |
Instance Method Details
#add_watch(watch) ⇒ Object
31 32 33 34 |
# File 'lib/filewatch/tail_mode/processor.rb', line 31 def add_watch(watch) @watch = watch self end |
#common_deleted_reaction(watched_file, action) ⇒ Object
197 198 199 200 201 202 203 |
# File 'lib/filewatch/tail_mode/processor.rb', line 197 def common_deleted_reaction(watched_file, action) # file has gone away or we can't read it anymore. watched_file.unwatch delete(watched_file) deletable_filepaths << watched_file.path logger.debug("#{action} - stat failed: #{watched_file.path}, removing from collection") end |
#common_error_reaction(path, error, action) ⇒ Object
205 206 207 |
# File 'lib/filewatch/tail_mode/processor.rb', line 205 def common_error_reaction(path, error, action) logger.error("#{action} - other error #{path}: (#{error.}, #{error.backtrace.take(8).inspect})") end |
#create(watched_file) ⇒ Object
46 47 48 |
# File 'lib/filewatch/tail_mode/processor.rb', line 46 def create(watched_file) @create.handle(watched_file) end |
#create_initial(watched_file) ⇒ Object
50 51 52 |
# File 'lib/filewatch/tail_mode/processor.rb', line 50 def create_initial(watched_file) @create_initial.handle(watched_file) end |
#delete(watched_file) ⇒ Object
62 63 64 |
# File 'lib/filewatch/tail_mode/processor.rb', line 62 def delete(watched_file) @delete.handle(watched_file) end |
#grow(watched_file) ⇒ Object
54 55 56 |
# File 'lib/filewatch/tail_mode/processor.rb', line 54 def grow(watched_file) @grow.handle(watched_file) end |
#initialize_handlers(sincedb_collection, observer) ⇒ Object
36 37 38 39 40 41 42 43 44 |
# File 'lib/filewatch/tail_mode/processor.rb', line 36 def initialize_handlers(sincedb_collection, observer) @create_initial = Handlers::CreateInitial.new(sincedb_collection, observer, @settings) @create = Handlers::Create.new(sincedb_collection, observer, @settings) @grow = Handlers::Grow.new(sincedb_collection, observer, @settings) @shrink = Handlers::Shrink.new(sincedb_collection, observer, @settings) @delete = Handlers::Delete.new(sincedb_collection, observer, @settings) @timeout = Handlers::Timeout.new(sincedb_collection, observer, @settings) @unignore = Handlers::Unignore.new(sincedb_collection, observer, @settings) end |
#process_active(watched_files) ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/filewatch/tail_mode/processor.rb', line 161 def process_active(watched_files) logger.debug("Active processing") # Handles watched_files in the active state. # it has been read once - unless they were empty at the time watched_files.select {|wf| wf.active? }.each do |watched_file| path = watched_file.path begin watched_file.restat rescue Errno::ENOENT # file has gone away or we can't read it anymore. common_deleted_reaction(watched_file, "Active") next rescue => e common_error_reaction(path, e, "Active") next end break if watch.quit? if watched_file.grown? logger.debug("Active - file grew: #{path}: new size is #{watched_file.last_stat_size}, old size #{watched_file.bytes_read}") grow(watched_file) elsif watched_file.shrunk? # we don't update the size here, its updated when we actually read logger.debug("Active - file shrunk #{path}: new size is #{watched_file.last_stat_size}, old size #{watched_file.bytes_read}") shrink(watched_file) else # same size, do nothing end # can any active files be closed to make way for waiting files? if watched_file.file_closable? logger.debug("Watch each: active: file expired: #{path}") timeout(watched_file) watched_file.close end end end |
#process_closed(watched_files) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/filewatch/tail_mode/processor.rb', line 74 def process_closed(watched_files) logger.debug("Closed processing") # Handles watched_files in the closed state. # if its size changed it is put into the watched state watched_files.select {|wf| wf.closed? }.each do |watched_file| path = watched_file.path begin watched_file.restat if watched_file.size_changed? # if the closed file changed, move it to the watched state # not to active state because we want to respect the active files window. watched_file.watch end rescue Errno::ENOENT # file has gone away or we can't read it anymore. common_deleted_reaction(watched_file, "Closed") rescue => e common_error_reaction(path, e, "Closed") end break if watch.quit? end end |
#process_ignored(watched_files) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/filewatch/tail_mode/processor.rb', line 97 def process_ignored(watched_files) logger.debug("Ignored processing") # Handles watched_files in the ignored state. # if its size changed: # put it in the watched state # invoke unignore watched_files.select {|wf| wf.ignored? }.each do |watched_file| path = watched_file.path begin watched_file.restat if watched_file.size_changed? watched_file.watch unignore(watched_file) end rescue Errno::ENOENT # file has gone away or we can't read it anymore. common_deleted_reaction(watched_file, "Ignored") rescue => e common_error_reaction(path, e, "Ignored") end break if watch.quit? end end |
#process_watched(watched_files) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/filewatch/tail_mode/processor.rb', line 121 def process_watched(watched_files) logger.debug("Watched processing") # Handles watched_files in the watched state. # for a slice of them: # move to the active state # and we allow the block to open the file and create a sincedb collection record if needed # some have never been active and some have # those that were active before but are watched now were closed under constraint # how much of the max active window is available to_take = @settings.max_active - watched_files.count{|wf| wf.active?} if to_take > 0 watched_files.select {|wf| wf.watched?}.take(to_take).each do |watched_file| path = watched_file.path begin watched_file.restat watched_file.activate if watched_file.initial? create_initial(watched_file) else create(watched_file) end rescue Errno::ENOENT # file has gone away or we can't read it anymore. common_deleted_reaction(watched_file, "Watched") rescue => e common_error_reaction(path, e, "Watched") end break if watch.quit? end else now = Time.now.to_i if (now - watch.lastwarn_max_files) > MAX_FILES_WARN_INTERVAL waiting = watched_files.size - @settings.max_active logger.warn(@settings.max_warn_msg + ", files yet to open: #{waiting}") watch.lastwarn_max_files = now end end end |
#shrink(watched_file) ⇒ Object
58 59 60 |
# File 'lib/filewatch/tail_mode/processor.rb', line 58 def shrink(watched_file) @shrink.handle(watched_file) end |
#timeout(watched_file) ⇒ Object
66 67 68 |
# File 'lib/filewatch/tail_mode/processor.rb', line 66 def timeout(watched_file) @timeout.handle(watched_file) end |
#unignore(watched_file) ⇒ Object
70 71 72 |
# File 'lib/filewatch/tail_mode/processor.rb', line 70 def unignore(watched_file) @unignore.handle(watched_file) end |