Class: RServiceBus::MonitorDir
Overview
Monitor Directory for files
Instance Attribute Summary
Attributes inherited from Monitor
#bus
Instance Method Summary
collapse
Methods inherited from Monitor
#_connect, #finished, #initialize, #reconnect, #send
Instance Method Details
#connect(uri) ⇒ Object
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/rservicebus/monitor/dir.rb', line 8
def connect(uri)
begin
input_dir = Dir.new(uri.path)
unless File.writable?(uri.path)
puts "***** Directory is not writable, #{uri.path}."
puts "***** Make the directory, #{uri.path}, writable and try again."
abort
end
rescue Errno::ENOENT
puts "***** Directory does not exist, #{uri.path}."
puts "***** Create the directory, #{uri.path}, and try again."
puts "***** eg, mkdir #{uri.path}"
abort
rescue Errno::ENOTDIR
puts "***** The specified path does not point to a directory,
#{uri.path}."
puts "***** Either repoint path to a directory, or remove, #{uri.path},
and create it as a directory."
puts "***** eg, rm #{uri.path} && mkdir #{uri.path}"
abort
end
@path = input_dir.path
@input_filter = []
return if uri.query.nil?
parts = CGI.parse(uri.query)
@querystringparts = parts
if parts.key?('archive')
archiveuri = URI.parse(parts['archive'][0])
unless File.directory?(archiveuri.path)
puts '***** Archive file name templating not yet supported.'
puts "***** Directory's only."
abort
end
@archivedir = archiveuri.path
end
return unless parts.key?('input_filter')
if parts['input_filter'].count > 1
puts 'Too many input_filters specified.'
puts '*** ZIP, or GZ are the only valid input_filters.'
abort
end
if parts['input_filter'][0] == 'ZIP'
elsif parts['input_filter'][0] == 'GZ'
elsif parts['input_filter'][0] == 'TAR'
else
puts 'Invalid input_filter specified.'
puts '*** ZIP, or GZ are the only valid input_filters.'
abort
end
@input_filter << parts['input_filter'][0]
end
|
#look ⇒ Object
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/rservicebus/monitor/dir.rb', line 110
def look
file_processed = 0
max_files_processed = 10
file_list = Dir.glob("#{@path}/*")
file_list.each do |file_path|
RServiceBus.log "Ready to process, #{file_path}"
content = process_path(file_path)
unless @archivedir.nil?
basename = File.basename(file_path)
new_file_path = "#{@archivedir}/#{basename}.
#{DateTime.now.strftime('%Y%m%d%H%M%S%L')}.zip"
RServiceBus.log "Writing to archive, #{new_file_path}"
Zip::ZipOutputStream.open(new_file_path) do |zos|
zos.put_next_entry(basename)
zos.puts content
end
end
File.unlink(file_path)
file_processed += 1
RServiceBus.log "Processed #{file_processed} of #{file_list.length}."
RServiceBus.log "Allow system tick #{self.class.name}"
break if file_processed >= max_files_processed
end
end
|
#process_content(content) ⇒ Object
66
67
68
|
# File 'lib/rservicebus/monitor/dir.rb', line 66
def process_content(content)
content
end
|
#process_path(file_path) ⇒ Object
102
103
104
105
106
107
108
|
# File 'lib/rservicebus/monitor/dir.rb', line 102
def process_path(file_path)
content = read_content_from_file(file_path)
payload = process_content(content)
send(payload, URI.parse(URI.encode("file://#{file_path}")))
content
end
|
#read_content_from_file(file_path) ⇒ Object
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/rservicebus/monitor/dir.rb', line 84
def read_content_from_file(file_path)
content = ''
if @input_filter.length > 0
if @input_filter[0] == 'ZIP'
content = read_content_from_zip_file(file_path)
elsif @input_filter[0] == 'GZ'
content = read_content_from_gz_file(file_path)
elsif @input_filter[0] == 'TAR'
fail 'TAR reader not implemented'
end
else
content = IO.read(file_path)
end
content
end
|
#read_content_from_gz_file(filepath) ⇒ Object
79
80
81
82
|
# File 'lib/rservicebus/monitor/dir.rb', line 79
def read_content_from_gz_file(filepath)
gz = Zlib::GzipReader.open(filepath)
gz.read
end
|
#read_content_from_zip_file(file_path) ⇒ Object
70
71
72
73
74
75
76
77
|
# File 'lib/rservicebus/monitor/dir.rb', line 70
def read_content_from_zip_file(file_path)
zip = Zip::ZipInputStream.open(file_path)
zip.get_next_entry
content = zip.read
zip.close
content
end
|