Class: RServiceBus2::MonitorDir
- Defined in:
- lib/rservicebus2/monitor/dir.rb
Overview
Monitor Directory for files rubocop:disable Metrics/ClassLength
Direct Known Subclasses
Instance Attribute Summary
Attributes inherited from Monitor
Instance Method Summary collapse
- #connect(uri) ⇒ Object
- #look ⇒ Object
- #process_content(content) ⇒ Object
- #process_path(file_path) ⇒ Object
-
#read_content_from_file(file_path) ⇒ Object
rubocop:disable Metrics/MethodLength.
- #read_content_from_gz_file(filepath) ⇒ Object
- #read_content_from_zip_file(file_path) ⇒ Object
Methods inherited from Monitor
#_connect, #finished, #initialize, #reconnect, #send
Constructor Details
This class inherits a constructor from RServiceBus2::Monitor
Instance Method Details
#connect(uri) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/rservicebus2/monitor/dir.rb', line 9 def connect(uri) # Pass the path through the Dir object to check syntax on startup begin input_dir = Dir.new(uri.path) unless File.writable?(uri.path) puts "***** Directory is not writable, #{uri.path}." puts "***** Make the directory, #{uri.path}, writable and try again." abort end rescue Errno::ENOENT puts "***** Directory does not exist, #{uri.path}." puts "***** Create the directory, #{uri.path}, and try again." puts "***** eg, mkdir #{uri.path}" abort rescue Errno::ENOTDIR puts "***** The specified path does not point to a directory, #{uri.path}." puts "***** Either repoint path to a directory, or remove, #{uri.path}, and create it as a directory." puts "***** eg, rm #{uri.path} && mkdir #{uri.path}" abort end @path = input_dir.path @input_filter = [] return if uri.query.nil? parts = CGI.parse(uri.query) @querystringparts = parts if parts.key?('archive') archiveuri = URI.parse(parts['archive'][0]) unless File.directory?(archiveuri.path) puts '***** Archive file name templating not yet supported.' puts "***** Directory's only." abort end @archivedir = archiveuri.path end return unless parts.key?('input_filter') if parts['input_filter'].count > 1 puts 'Too many input_filters specified.' puts '*** ZIP, or GZ are the only valid input_filters.' abort end if parts['input_filter'][0] == 'ZIP' elsif parts['input_filter'][0] == 'GZ' elsif parts['input_filter'][0] == 'TAR' else puts 'Invalid input_filter specified.' puts '*** ZIP, or GZ are the only valid input_filters.' abort end @input_filter << parts['input_filter'][0] end |
#look ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/rservicebus2/monitor/dir.rb', line 112 def look file_processed = 0 max_files_processed = 10 file_list = Dir.glob("#{@path}/*") file_list.each do |file_path| if File.file?(file_path) != true RServiceBus2.log "Skipping directory, #{file_path}" next end RServiceBus2.log "Ready to process, #{file_path}" content = process_path(file_path) unless @archivedir.nil? basename = File.basename(file_path) new_file_path = "#{@archivedir}/#{basename}.#{DateTime.now.strftime('%Y%m%d%H%M%S%L')}.zip" RServiceBus2.log "Writing to archive, #{new_file_path}" Zip::ZipOutputStream.open(new_file_path) do |zos| zos.put_next_entry(basename) zos.puts content end end File.unlink(file_path) file_processed += 1 RServiceBus2.log "Processed #{file_processed} of #{file_list.length}." RServiceBus2.log "Allow system tick #{self.class.name}" break if file_processed >= max_files_processed end end |
#process_content(content) ⇒ Object
67 68 69 |
# File 'lib/rservicebus2/monitor/dir.rb', line 67 def process_content(content) content end |
#process_path(file_path) ⇒ Object
104 105 106 107 108 109 110 |
# File 'lib/rservicebus2/monitor/dir.rb', line 104 def process_path(file_path) content = read_content_from_file(file_path) payload = process_content(content) send(payload, URI.parse(URI.encode("file://#{file_path}"))) content end |
#read_content_from_file(file_path) ⇒ Object
rubocop:disable Metrics/MethodLength
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/rservicebus2/monitor/dir.rb', line 86 def read_content_from_file(file_path) content = '' if @input_filter.length > 0 if @input_filter[0] == 'ZIP' content = read_content_from_zip_file(file_path) elsif @input_filter[0] == 'GZ' content = read_content_from_gz_file(file_path) elsif @input_filter[0] == 'TAR' fail 'TAR reader not implemented' end else content = IO.read(file_path) end content end |
#read_content_from_gz_file(filepath) ⇒ Object
80 81 82 83 |
# File 'lib/rservicebus2/monitor/dir.rb', line 80 def read_content_from_gz_file(filepath) gz = Zlib::GzipReader.open(filepath) gz.read end |
#read_content_from_zip_file(file_path) ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'lib/rservicebus2/monitor/dir.rb', line 71 def read_content_from_zip_file(file_path) zip = Zip::ZipInputStream.open(file_path) zip.get_next_entry content = zip.read zip.close content end |