Class: RServiceBus::Monitor_Dir
Instance Attribute Summary
Attributes inherited from Monitor
#Bus
Instance Method Summary
collapse
Methods inherited from Monitor
#_connect, #finished, #initialize, #reconnect, #send
Instance Method Details
#connect(uri) ⇒ Object
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/rservicebus/Monitor/Dir.rb', line 14
def connect(uri)
begin
inputDir = Dir.new( uri.path )
unless File.writable?(uri.path) then
puts "***** Directory is not writable, #{uri.path}."
puts "***** Make the directory, #{uri.path}, writable and try again."
abort()
end
rescue Errno::ENOENT => e
puts "***** Directory does not exist, #{uri.path}."
puts "***** Create the directory, #{uri.path}, and try again."
puts "***** eg, mkdir #{uri.path}"
abort();
rescue Errno::ENOTDIR => e
puts "***** The specified path does not point to a directory, #{uri.path}."
puts "***** Either repoint path to a directory, or remove, #{uri.path}, and create it as a directory."
puts "***** eg, rm #{uri.path} && mkdir #{uri.path}"
abort();
end
@Path = inputDir.path
@InputFilter = Array.new
return if uri.query.nil?
parts = CGI.parse(uri.query)
@QueryStringParts = parts
if parts.has_key?('archive') then
archiveUri = URI.parse( parts['archive'][0] )
unless File.directory?(archiveUri.path) then
puts '***** Archive file name templating not yet supported.'
puts "***** Directory's only."
abort()
end
@ArchiveDir = archiveUri.path
end
if parts.has_key?('inputfilter') then
if parts['inputfilter'].count > 1 then
puts 'Too many inputfilters specified.'
puts '*** ZIP, or GZ are the only valid inputfilters.'
abort();
end
if parts['inputfilter'][0] == 'ZIP' then
elsif parts['inputfilter'][0] == 'GZ' then
elsif parts['inputfilter'][0] == 'TAR' then
else
puts 'Invalid inputfilter specified.'
puts '*** ZIP, or GZ are the only valid inputfilters.'
abort();
end
@InputFilter << parts['inputfilter'][0]
end
end
|
#Look ⇒ Object
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
# File 'lib/rservicebus/Monitor/Dir.rb', line 124
def Look
fileProcessed = 0
maxFilesProcessed = 10
fileList = Dir.glob( "#{@Path}/*" )
fileList.each do |filePath|
RServiceBus.log "Ready to process, #{filePath}"
content = self.ProcessPath( filePath )
unless @ArchiveDir.nil? then
basename = File.basename(filePath)
newFilePath = @ArchiveDir + '/' + basename + '.' + DateTime.now.strftime('%Y%m%d%H%M%S%L') + '.zip'
RServiceBus.log "Writing to archive, #{newFilePath}"
Zip::ZipOutputStream.open(newFilePath) {
|zos|
zos.put_next_entry(basename)
zos.puts content
}
end
File.unlink( filePath )
fileProcessed = fileProcessed + 1
RServiceBus.log "Processed #{fileProcessed} of #{fileList.length}."
RServiceBus.log "Allow system tick #{self.class.name}"
return if fileProcessed >= maxFilesProcessed
end
end
|
#ProcessContent(content) ⇒ Object
72
73
74
|
# File 'lib/rservicebus/Monitor/Dir.rb', line 72
def ProcessContent( content )
return content
end
|
#ProcessPath(filePath) ⇒ Object
116
117
118
119
120
121
122
|
# File 'lib/rservicebus/Monitor/Dir.rb', line 116
def ProcessPath( filePath )
content = self.ReadContentFromFile( filePath )
payload = self.ProcessContent( content )
self.send( payload, URI.parse( "file://#{filePath}" ) )
return content
end
|
#ReadContentFromFile(filePath) ⇒ Object
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
# File 'lib/rservicebus/Monitor/Dir.rb', line 98
def ReadContentFromFile( filePath )
content = ''
if @InputFilter.length > 0 then
if @InputFilter[0] == 'ZIP' then
entry, content = self.ReadContentFromZipFile( filePath )
elsif @InputFilter[0] == 'GZ' then
content = self.ReadContentFromGzFile( filePath )
elsif @InputFilter[0] == 'TAR' then
content = self.ReadContentFromTarFile( filePath )
end
else
content = IO.read( filePath )
end
return content
end
|
#ReadContentFromGzFile(filePath) ⇒ Object
85
86
87
88
|
# File 'lib/rservicebus/Monitor/Dir.rb', line 85
def ReadContentFromGzFile( filePath )
gz = Zlib::GzipReader.open(filePath)
return gz.read
end
|
#ReadContentFromTarFile(filePath) ⇒ Object
90
91
92
93
94
95
96
|
# File 'lib/rservicebus/Monitor/Dir.rb', line 90
def ReadContentFromTarFile( filePath )
raise 'Not supported yet'
content = ''
return content
end
|
#ReadContentFromZipFile(filePath) ⇒ Object
76
77
78
79
80
81
82
83
|
# File 'lib/rservicebus/Monitor/Dir.rb', line 76
def ReadContentFromZipFile( filePath )
zip = Zip::ZipInputStream::open(filePath)
entry = zip.get_next_entry
content = zip.read
zip.close
return entry, content
end
|