Class: FileMonitoring::FileMonitoring

Inherits:
Object
  • Object
show all
Defined in:
lib/file_monitoring/file_monitoring.rb

Overview

Manages file monitoring of number of file system locations

Instance Method Summary collapse

Instance Method Details

#monitor_filesObject

The main method. Loops on all paths, each time span and monitors them.

Algorithm:

There is a loop that performs at every iteration:

1.Pull entry with a minimal time of check from queue
2.Recursively check path taken from entry for changes
  a.Notify subscribed processes on changes
3.Push entry to the queue with new time of next check

This methods controlled by monitoring_paths configuration parameter, that provides path and file monitoring configuration data



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/file_monitoring/file_monitoring.rb', line 29

def monitor_files
  conf_array = Params['monitoring_paths']

  # create root dirs of monitoring
  dir_stat_array = []
  conf_array.each { |elem|
    dir_stat = DirStat.new(File.expand_path(elem['path']), elem['stable_state'])
    dir_stat.set_event_queue(@event_queue) if @event_queue
    dir_stat_array.push(dir_stat)
  }

  #Look over loaded content data if not empty
  # If file is under monitoring path - Add to DirStat tree as stable with path,size,mod_time read from file
  # If file is NOT under monitoring path - skip (not a valid usage)
  unless $local_content_data.empty?
    Log.info("Start build data base from loaded file")
    $local_content_data.each_instance {
        |_, size, _, mod_time, _, path|
      # construct sub paths array from full file path:
      # Example:
      #   instance path = /dir1/dir2/file_name
      #   Sub path 1: /dir1
      #   Sub path 2: /dir1/dir2
      #   Sub path 3: /dir1/dir2/file_name
      # sub paths would create DirStat objs or FileStat(FileStat create using last sub path).
      split_path = path.split(File::SEPARATOR)
      sub_paths = (0..split_path.size-1).inject([]) { |paths, i|
        paths.push(File.join(*split_path.values_at(0..i)))
      }
      # sub_paths holds array => ["/dir1","/dir1/dir2","/dir1/dir2/file_name"]

      # Loop over monitor paths to start build tree under each
      dir_stat_array.each { | dir_stat|
        # check if monitor path is one of the sub paths and find it's sub path index
        # if index is found then it the monitor path
        # the next index indicates the next sub path to insert to the tree
        # the index will be raised at each recursive call down the tree
        sub_paths_index = sub_paths.index(dir_stat.path)
        next if sub_paths_index.nil?  # monitor path was not found. skip this instance.
        # monitor path was found. Add to tree as stable.
        dir_stat.state = FileStatEnum::STABLE
        # start the recursive call with next sub path index
        dir_stat.load_instance(sub_paths, sub_paths_index+1, size, mod_time)
      }
    }
    Log.info("End build data base from loaded file")
  end

  # Directories states stored in the priority queue,
  # where the key (priority) is a time when it should be checked next time.
  # Priority queue means that all entries arranged by key (time to check) in increasing order.
  pq = Containers::PriorityQueue.new
  conf_array.each_with_index { |elem, index|
    priority = (Time.now + elem['scan_period']).to_i
    #Log.info("File monitoring started for: #{elem}")
    pq.push([priority, elem, dir_stat_array[index]], -priority)
  }

  #init log4r
  monitoring_log_path = Params['default_monitoring_log_path']
  Log.debug1 'File monitoring log: ' + Params['default_monitoring_log_path']
  monitoring_log_dir = File.dirname(monitoring_log_path)
  FileUtils.mkdir_p(monitoring_log_dir) unless File.exists?(monitoring_log_dir)

  @log4r = Log4r::Logger.new 'BBFS monitoring log'
  @log4r.trace = true
  formatter = Log4r::PatternFormatter.new(:pattern => "[%d] [%m]")
  #file setup
  file_config = {
      "filename" => Params['default_monitoring_log_path'],
      "maxsize" => Params['log_rotation_size'],
      "trunc" => true
  }
  file_outputter = Log4r::RollingFileOutputter.new("monitor_log", file_config)
  file_outputter.level = Log4r::INFO
  file_outputter.formatter = formatter
  @log4r.outputters << file_outputter
  FileStat.set_log(@log4r)

  while true do
    # pull entry that should be checked next,
    # according to it's scan_period
    time, conf, dir_stat = pq.pop
    # time remains to wait before directory should be checked
    time_span = time - Time.now.to_i
    if (time_span > 0)
      sleep(time_span)
    end

    unless $testing_memory_active
      Log.info("Start monitor for dir:%s", dir_stat.path)
      dir_stat.monitor
      Log.info("End monitor for dir:%s", dir_stat.path)
    else
      $testing_memory_log.info("Start monitor at :#{Time.now}")
      puts "Start monitor at :#{Time.now}"
      dir_stat.monitor
      $testing_memory_log.info("End monitor at :#{Time.now}")
      puts "End monitor at :#{Time.now}"
    end

    # push entry with new a next time it should be checked as a priority key
    priority = (Time.now + conf['scan_period']).to_i
    pq.push([priority, conf, dir_stat], -priority)
  end
end

#set_event_queue(queue) ⇒ Object

Set event queue used for communication between different proceses.

Parameters:

  • queue (Queue)


14
15
16
# File 'lib/file_monitoring/file_monitoring.rb', line 14

def set_event_queue(queue)
  @event_queue = queue
end