Class: RServiceBus2::MonitorDir

Inherits:
Monitor
  • Object
show all
Defined in:
lib/rservicebus2/monitor/dir.rb

Overview

Monitor Directory for files rubocop:disable Metrics/ClassLength

Direct Known Subclasses

MonitorXmlDir

Instance Attribute Summary

Attributes inherited from Monitor

#bus

Instance Method Summary collapse

Methods inherited from Monitor

#_connect, #finished, #initialize, #reconnect, #send

Constructor Details

This class inherits a constructor from RServiceBus2::Monitor

Instance Method Details

#connect(uri) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/rservicebus2/monitor/dir.rb', line 9

def connect(uri)
  # Pass the path through the Dir object to check syntax on startup
  begin
    input_dir = Dir.new(uri.path)
    unless File.writable?(uri.path)
      puts "***** Directory is not writable, #{uri.path}."
      puts "***** Make the directory, #{uri.path}, writable and try again."
      abort
    end
  rescue Errno::ENOENT
    puts "***** Directory does not exist, #{uri.path}."
    puts "***** Create the directory, #{uri.path}, and try again."
    puts "***** eg, mkdir #{uri.path}"
    abort
  rescue Errno::ENOTDIR
    puts "***** The specified path does not point to a directory,
          #{uri.path}."
    puts "***** Either repoint path to a directory, or remove, #{uri.path},
          and create it as a directory."
    puts "***** eg, rm #{uri.path} && mkdir #{uri.path}"
    abort
  end

  @path = input_dir.path
  @input_filter = []

  return if uri.query.nil?
  parts = CGI.parse(uri.query)
  @querystringparts = parts
  if parts.key?('archive')
    archiveuri = URI.parse(parts['archive'][0])
    unless File.directory?(archiveuri.path)
      puts '***** Archive file name templating not yet supported.'
      puts "***** Directory's only."
      abort
    end
    @archivedir = archiveuri.path
  end

  return unless parts.key?('input_filter')

  if parts['input_filter'].count > 1
    puts 'Too many input_filters specified.'
    puts '*** ZIP, or GZ are the only valid input_filters.'
    abort
  end

  if parts['input_filter'][0] == 'ZIP'
  elsif parts['input_filter'][0] == 'GZ'
  elsif parts['input_filter'][0] == 'TAR'
  else
    puts 'Invalid input_filter specified.'
    puts '*** ZIP, or GZ are the only valid input_filters.'
    abort
  end
  @input_filter << parts['input_filter'][0]
end

#lookObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/rservicebus2/monitor/dir.rb', line 112

def look
  file_processed = 0
  max_files_processed = 10

  file_list = Dir.glob("#{@path}/*")
  file_list.each do |file_path|
    if File.file?(file_path) != true
      RServiceBus2.log "Skipping directory, #{file_path}"
      next
    end
    RServiceBus2.log "Ready to process, #{file_path}"
    content = process_path(file_path)

    unless @archivedir.nil?
      basename = File.basename(file_path)
      new_file_path = "#{@archivedir}/#{basename}.#{DateTime.now.strftime('%Y%m%d%H%M%S%L')}.zip"
      RServiceBus2.log "Writing to archive, #{new_file_path}"

      Zip::ZipOutputStream.open(new_file_path) do |zos|
        zos.put_next_entry(basename)
        zos.puts content
      end
    end
    File.unlink(file_path)

    file_processed += 1
    RServiceBus2.log "Processed #{file_processed} of #{file_list.length}."
    RServiceBus2.log "Allow system tick #{self.class.name}"
    break if file_processed >= max_files_processed
  end
end

#process_content(content) ⇒ Object



67
68
69
# File 'lib/rservicebus2/monitor/dir.rb', line 67

def process_content(content)
  content
end

#process_path(file_path) ⇒ Object



104
105
106
107
108
109
110
# File 'lib/rservicebus2/monitor/dir.rb', line 104

def process_path(file_path)
  content = read_content_from_file(file_path)
  payload = process_content(content)

  send(payload, URI.parse(URI.encode("file://#{file_path}")))
  content
end

#read_content_from_file(file_path) ⇒ Object

rubocop:disable Metrics/MethodLength



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/rservicebus2/monitor/dir.rb', line 86

def read_content_from_file(file_path)
  content = ''
  if @input_filter.length > 0
    if @input_filter[0] == 'ZIP'
      content = read_content_from_zip_file(file_path)
    elsif @input_filter[0] == 'GZ'
      content = read_content_from_gz_file(file_path)
    elsif @input_filter[0] == 'TAR'
      fail 'TAR reader not implemented'
    end

  else
    content = IO.read(file_path)
  end

  content
end

#read_content_from_gz_file(filepath) ⇒ Object



80
81
82
83
# File 'lib/rservicebus2/monitor/dir.rb', line 80

def read_content_from_gz_file(filepath)
  gz = Zlib::GzipReader.open(filepath)
  gz.read
end

#read_content_from_zip_file(file_path) ⇒ Object



71
72
73
74
75
76
77
78
# File 'lib/rservicebus2/monitor/dir.rb', line 71

def read_content_from_zip_file(file_path)
  zip = Zip::ZipInputStream.open(file_path)
  zip.get_next_entry
  content = zip.read
  zip.close

  content
end