Method: FileMonitoring::FileMonitoring#monitor_files

Defined in:
lib/file_monitoring/file_monitoring.rb

#monitor_filesObject

The main method. Loops on all paths, each time span and monitors them.

Algorithm:

There is a loop that performs at every iteration:

1.Pull entry with a minimal time of check from queue
2.Recursively check path taken from entry for changes
  a.Notify subscribed processes on changes
3.Push entry to the queue with new time of next check

This methods controlled by monitoring_paths configuration parameter, that provides path and file monitoring configuration data



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/file_monitoring/file_monitoring.rb', line 112

def monitor_files
  
  #init log4r
  monitoring_log_path = Params['default_monitoring_log_path']
  Log.debug1 'File monitoring log: ' + Params['default_monitoring_log_path']
  monitoring_log_dir = File.dirname(monitoring_log_path)
  FileUtils.mkdir_p(monitoring_log_dir) unless File.exists?(monitoring_log_dir)

  @log4r = Log4r::Logger.new 'BBFS monitoring log'
  @log4r.trace = true
  formatter = Log4r::PatternFormatter.new(:pattern => "[%d] [%m]")
  #file setup
  file_config = {
      "filename" => Params['default_monitoring_log_path'],
      "maxsize" => Params['log_rotation_size'],
      "trunc" => true
  }
  file_outputter = Log4r::RollingFileOutputter.new("monitor_log", file_config)
  file_outputter.level = Log4r::INFO
  file_outputter.formatter = formatter
  @log4r.outputters << file_outputter
  ::FileMonitoring::DirStat.set_log(@log4r)
  
  conf_array = Params['monitoring_paths']

  # create root dirs of monitoring
  dir_stat_array = []
  conf_array.each { |elem|
    dir_stat = DirStat.new(File.expand_path(elem['path']))
    dir_stat_array.push([dir_stat, elem['stable_state']])
  }

  # This structure is used to optimize indexing when user specifies a directory was moved.
  file_attr_to_checksum = {}

  #Look over loaded content data if not empty
  unless $local_content_data.empty?
    Log.info("Start build data base from loaded file. This could take several minutes")
    load_instances(file_attr_to_checksum, dir_stat_array)
    load_symlinks(dir_stat_array)
    Log.info("End build data base from loaded file")
    $last_content_data_id = $local_content_data.unique_id

    if Params['manual_file_changes']
      # -------------------------- MANUAL MODE
      # ------------ LOOP DIRS
      dir_stat_array.each { | dir_stat|
        log_msg = "In Manual mode. Start monitor path:#{dir_stat[0].path}. moved or copied files (same name, size and time " +
            'modification) will use the checksum of the original files and be updated in content data file'
        Log.info(log_msg)
        $testing_memory_log.info(log_msg) if $testing_memory_active

        # ------- MONITOR
        dir_stat[0].monitor(file_attr_to_checksum)

        # ------- REMOVE PATHS
        # remove non existing (not marked) files\dirs
        log_msg = 'Start remove non existing paths'
        Log.info(log_msg)
        $testing_memory_log.info(log_msg) if $testing_memory_active
        dir_stat[0].removed_unmarked_paths
        log_msg = 'End monitor path and index'
        Log.info(log_msg)
        $testing_memory_log.info(log_msg) if $testing_memory_active
      }

      # ------ WRITE CONTENT DATA
      ContentServer.flush_content_data
      raise("Finished manual changes and update file:#{Params['local_content_data_path']}. Exit application\n")
    end
  else
    if Params['manual_file_changes']
      Log.info('Feature: manual_file_changes is ON. But No previous content data found. ' +
               'No change is required. Existing application')
      raise('Feature: manual_file_changes is ON. But No previous content data found at ' +
               "file:#{Params['local_content_data_path']}. No change is required. Existing application\n")
    end
  end

  # Directories states stored in the priority queue,
  # where the key (priority) is a time when it should be checked next time.
  # Priority queue means that all entries arranged by key (time to check) in increasing order.
  pq = Containers::PriorityQueue.new
  conf_array.each_with_index { |elem, index|
    priority = (Time.now + elem['scan_period']).to_i
    #Log.info("File monitoring started for: #{elem}")
    pq.push([priority, elem, dir_stat_array[index][0]], -priority)
  }


  while true do
    # pull entry that should be checked next,
    # according to it's scan_period
    time, elem, dir_stat = pq.pop
    # time remains to wait before directory should be checked
    time_span = time - Time.now.to_i
    if (time_span > 0)
      sleep(time_span)
    end

    # Start monitor
    Log.info("Start monitor path:%s ", dir_stat.path)
    $testing_memory_log.info("Start monitor path:#{dir_stat.path}") if $testing_memory_active
    ::FileMonitoring.stable_state=elem['stable_state']
    dir_stat.monitor

    # remove non existing (not marked) files\dirs
    Log.info('Start remove non existing paths')
    $testing_memory_log.info('Start remove non existing paths') if $testing_memory_active
    dir_stat.removed_unmarked_paths
    Log.info('End monitor path and index')
    $testing_memory_log.info('End monitor path and index') if $testing_memory_active

    # Start index
    Log.info("Start index path:%s ", dir_stat.path)
    $testing_memory_log.info("Start index path:#{dir_stat.path}") if $testing_memory_active
    dir_stat.index

    # print number of indexed files
    Log.debug1("indexed file count:%s", $indexed_file_count)
    $testing_memory_log.info("indexed file count: #{$indexed_file_count}") if $testing_memory_active

    #flush content data if changed
    ContentServer.flush_content_data

    #Add back to queue
    priority = (Time.now + elem['scan_period']).to_i
    pq.push([priority, elem, dir_stat], -priority)
  end
end