Class: RServiceBus::MonitorDir

Inherits:
Monitor
  • Object
show all
Defined in:
lib/rservicebus/monitor/dir.rb

Overview

Monitor Directory for files

Direct Known Subclasses

MonitorCsvDir, MonitorXmlDir

Instance Attribute Summary

Attributes inherited from Monitor

#bus

Instance Method Summary collapse

Methods inherited from Monitor

#_connect, #finished, #initialize, #reconnect, #send

Constructor Details

This class inherits a constructor from RServiceBus::Monitor

Instance Method Details

#connect(uri) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/rservicebus/monitor/dir.rb', line 8

def connect(uri)
  # Pass the path through the Dir object to check syntax on startup
  begin
    input_dir = Dir.new(uri.path)
    unless File.writable?(uri.path)
      puts "***** Directory is not writable, #{uri.path}."
      puts "***** Make the directory, #{uri.path}, writable and try again."
      abort
    end
  rescue Errno::ENOENT
    puts "***** Directory does not exist, #{uri.path}."
    puts "***** Create the directory, #{uri.path}, and try again."
    puts "***** eg, mkdir #{uri.path}"
    abort
  rescue Errno::ENOTDIR
    puts "***** The specified path does not point to a directory,
          #{uri.path}."
    puts "***** Either repoint path to a directory, or remove, #{uri.path},
          and create it as a directory."
    puts "***** eg, rm #{uri.path} && mkdir #{uri.path}"
    abort
  end

  @path = input_dir.path
  @input_filter = []

  return if uri.query.nil?
  parts = CGI.parse(uri.query)
  @querystringparts = parts
  if parts.key?('archive')
    archiveuri = URI.parse(parts['archive'][0])
    unless File.directory?(archiveuri.path)
      puts '***** Archive file name templating not yet supported.'
      puts "***** Directory's only."
      abort
    end
    @archivedir = archiveuri.path
  end

  return unless parts.key?('input_filter')

  if parts['input_filter'].count > 1
    puts 'Too many input_filters specified.'
    puts '*** ZIP, or GZ are the only valid input_filters.'
    abort
  end

  if parts['input_filter'][0] == 'ZIP'
  elsif parts['input_filter'][0] == 'GZ'
  elsif parts['input_filter'][0] == 'TAR'
  else
    puts 'Invalid input_filter specified.'
    puts '*** ZIP, or GZ are the only valid input_filters.'
    abort
  end
  @input_filter << parts['input_filter'][0]
end

#lookObject



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/rservicebus/monitor/dir.rb', line 110

def look
  file_processed = 0
  max_files_processed = 10

  file_list = Dir.glob("#{@path}/*")
  file_list.each do |file_path|
    RServiceBus.log "Ready to process, #{file_path}"
    content = process_path(file_path)

    unless @archivedir.nil?
      basename = File.basename(file_path)
      new_file_path = "#{@archivedir}/#{basename}.
                        #{DateTime.now.strftime('%Y%m%d%H%M%S%L')}.zip"
      RServiceBus.log "Writing to archive, #{new_file_path}"

      Zip::ZipOutputStream.open(new_file_path) do |zos|
        zos.put_next_entry(basename)
        zos.puts content
      end
    end
    File.unlink(file_path)

    file_processed += 1
    RServiceBus.log "Processed #{file_processed} of #{file_list.length}."
    RServiceBus.log "Allow system tick #{self.class.name}"
    break if file_processed >= max_files_processed
  end
end

#process_content(content) ⇒ Object



66
67
68
# File 'lib/rservicebus/monitor/dir.rb', line 66

def process_content(content)
  content
end

#process_path(file_path) ⇒ Object



102
103
104
105
106
107
108
# File 'lib/rservicebus/monitor/dir.rb', line 102

def process_path(file_path)
  content = read_content_from_file(file_path)
  payload = process_content(content)

  send(payload, URI.parse(URI.encode("file://#{file_path}")))
  content
end

#read_content_from_file(file_path) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rservicebus/monitor/dir.rb', line 84

def read_content_from_file(file_path)
  content = ''
  if @input_filter.length > 0
    if @input_filter[0] == 'ZIP'
      content = read_content_from_zip_file(file_path)
    elsif @input_filter[0] == 'GZ'
      content = read_content_from_gz_file(file_path)
    elsif @input_filter[0] == 'TAR'
      fail 'TAR reader not implemented'
    end

  else
    content = IO.read(file_path)
  end

  content
end

#read_content_from_gz_file(filepath) ⇒ Object



79
80
81
82
# File 'lib/rservicebus/monitor/dir.rb', line 79

def read_content_from_gz_file(filepath)
  gz = Zlib::GzipReader.open(filepath)
  gz.read
end

#read_content_from_zip_file(file_path) ⇒ Object



70
71
72
73
74
75
76
77
# File 'lib/rservicebus/monitor/dir.rb', line 70

def read_content_from_zip_file(file_path)
  zip = Zip::ZipInputStream.open(file_path)
  zip.get_next_entry
  content = zip.read
  zip.close

  content
end