Class: RFlow::Components::File::DirectoryWatcher
- Inherits:
-
RFlow::Component
- Object
- RFlow::Component
- RFlow::Components::File::DirectoryWatcher
- Defined in:
- lib/rflow/components/file/directory_watcher.rb
Constant Summary collapse
- DEFAULT_CONFIG =
{ 'directory_path' => '/tmp/import', 'file_name_glob' => '*', 'poll_interval' => 1, 'files_per_poll' => 1, 'remove_files' => true, }
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#directory_path ⇒ Object
Returns the value of attribute directory_path.
-
#file_name_glob ⇒ Object
Returns the value of attribute file_name_glob.
-
#poll_interval ⇒ Object
Returns the value of attribute poll_interval.
-
#remove_files ⇒ Object
Returns the value of attribute remove_files.
Instance Method Summary collapse
- #configure!(config) ⇒ Object
-
#run! ⇒ Object
TODO: optimize sending of messages based on what is connected.
- #to_boolean(string) ⇒ Object
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
19 20 21 |
# File 'lib/rflow/components/file/directory_watcher.rb', line 19 def config @config end |
#directory_path ⇒ Object
Returns the value of attribute directory_path.
19 20 21 |
# File 'lib/rflow/components/file/directory_watcher.rb', line 19 def directory_path @directory_path end |
#file_name_glob ⇒ Object
Returns the value of attribute file_name_glob.
19 20 21 |
# File 'lib/rflow/components/file/directory_watcher.rb', line 19 def file_name_glob @file_name_glob end |
#poll_interval ⇒ Object
Returns the value of attribute poll_interval.
19 20 21 |
# File 'lib/rflow/components/file/directory_watcher.rb', line 19 def poll_interval @poll_interval end |
#remove_files ⇒ Object
Returns the value of attribute remove_files.
19 20 21 |
# File 'lib/rflow/components/file/directory_watcher.rb', line 19 def remove_files @remove_files end |
Instance Method Details
#configure!(config) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/rflow/components/file/directory_watcher.rb', line 21 def configure!(config) @config = DEFAULT_CONFIG.merge config @directory_path = ::File.(@config['directory_path']) @file_name_glob = @config['file_name_glob'] @poll_interval = @config['poll_interval'].to_i @files_per_poll = @config['files_per_poll'].to_i @remove_files = to_boolean(@config['remove_files']) unless ::File.directory?(@directory_path) raise ArgumentError, "Invalid directory '#{@directory_path}'" end unless ::File.readable?(@directory_path) raise ArgumentError, "Unable to read from directory '#{@directory_path}'" end # TODO: more error checking of input config end |
#run! ⇒ Object
TODO: optimize sending of messages based on what is connected
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/rflow/components/file/directory_watcher.rb', line 41 def run! timer = EventMachine::PeriodicTimer.new(poll_interval) do RFlow.logger.debug { "#{name}: Polling for files in #{::File.join(@directory_path, @file_name_glob)}" } file_paths = Dir.glob(::File.join(@directory_path, @file_name_glob)). sort_by {|f| test(?M, f)}. # sort by last modified to process the earliest modified file first select {|f| shard.count == 1 || ((f.sum % shard.count) + 1 == worker.index) } # for multiple copies, share the load equally file_paths.first(@files_per_poll).each do |path| RFlow.logger.debug { "#{name}: Importing #{path}" } unless ::File.readable?(path) RFlow.logger.warn "#{name}: Unable to read file #{path}, skipping it" next end if @remove_files && !::File.writable?(path) RFlow.logger.warn "#{name}: Unable to remove file #{path}, skipping it" next end ::File.open(path, 'r:BINARY') do |file| content = file.read RFlow.logger.debug { "#{name}: Read #{content.bytesize} bytes of #{file.size} in #{file.path}, md5 #{Digest::MD5.hexdigest(content)}" } file_port.(RFlow::Message.new('RFlow::Message::Data::File').tap do |m| m.data.path = ::File.(file.path) m.data.size = file.size m.data.content = content m.data. = file.ctime m.data. = file.mtime m.data. = file.atime end) raw_port.(RFlow::Message.new('RFlow::Message::Data::Raw').tap do |m| m.data.raw = content end) end if @remove_files RFlow.logger.debug { "#{name}: Removing #{path}" } ::File.delete path end end end end |
#to_boolean(string) ⇒ Object
86 87 88 89 90 91 92 |
# File 'lib/rflow/components/file/directory_watcher.rb', line 86 def to_boolean(string) case string when /^true$/i, '1', true; true when /^false/i, '0', false; false else raise ArgumentError, "'#{string}' cannot be coerced to a boolean value" end end |