Class: StreamWorkflowTask
Constant Summary
collapse
- EOL =
"\r\n"
RbbtRESTHelpers::MEMORY_CACHE, RbbtRESTHelpers::PAGE_SIZE
Instance Attribute Summary
#javascript_resources, #plugin_resources, #sass_resources, #template_resources
#workflow_resources
Instance Method Summary
collapse
-
#_merge_chunks(sin, sout) ⇒ Object
-
#call(env) ⇒ Object
-
#copy_until_boundary(sin, sout, boundary) ⇒ Object
-
#do_stream(env) ⇒ Object
-
#get_inputs(content_type, stream) ⇒ Object
-
#initialize(app) ⇒ StreamWorkflowTask
constructor
A new instance of StreamWorkflowTask.
-
#merge_chunks(sin, sout, buffer) ⇒ Object
-
#parse_uri(env) ⇒ Object
-
#read_normal_inputs(io, boundary, stream_input) ⇒ Object
-
#run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil) ⇒ Object
#add_GET_param, add_sass_load_path, #cache, #check_step, #consume_parameter, css_resources, #css_resources, #development?, #error_for, #file_or_text_area, #file_resources, file_resources, #filter, #fix_input, #form_input, #fragment, #hash2dl, #header, #html_tag, #input_label, javascript_resources, #link_css, #link_js, load_tsv, #load_tsv, #locate_css, #locate_css_from_resource, #locate_file, #locate_file_from_resource, #locate_javascript, #locate_javascript_from_resource, #locate_sass, #locate_sass_from_resource, #locate_template, #locate_template_from_resource, #log, #modal_fragment, #old_cache, #paginate, #param2boolean, #parse_page, #partial_render, #permalink, #prepare_input, #process_common_parameters, #production?, #record_css, #record_js, #recorded_css_files, #recorded_js_files, #remove_GET_param, #render, #render_partial, #render_sass, #reset_js_css, #resource, #reveal, sass_resources, save_tsv, #save_tsv, #serve_css, #serve_js, #table, #table_value, #template_render, template_resources, #tsv2html, #tsv_process, #tsv_rows, #tsv_rows_full, #wait_on
#clean_job, #complete_input_set, #consume_task_parameters, #execution_type, #issue_job, #locate_workflow_template, #locate_workflow_template_from_resource, #prepare_job_inputs, #recursive_clean_job, #show_exec_result, #show_result, #show_result_html, #stream_job, #type_of_export, #workflow_partial, #workflow_render, workflow_resources
Constructor Details
Returns a new instance of StreamWorkflowTask.
5
6
7
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 5
def initialize(app)
@app = app
end
|
Instance Method Details
#_merge_chunks(sin, sout) ⇒ Object
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 147
def _merge_chunks(sin, sout)
begin
while true
chunk_size_str = ""
stop = false
while chunk_size_str.strip.empty?
chunk_size_str = sin.gets
raise "Empty chunk size" if chunk_size_str.nil? or chunk_size_str.strip.empty?
chunk_size_str = "" if chunk_size_str.nil?
end
break if stop
size = chunk_size_str.strip.to_i(16)
break if size == 0
chunk = sin.read(size)
bound = sin.read(2)
raise "bound not right: #{ bound }" if bound != EOL
raise "Size does not match: #{[chunk.length, size] * " != "}" if chunk.length != size
sout.write chunk
end
rescue Aborted
raise $!
rescue Exception
Log.exception $!
raise $!
ensure
if sin.respond_to? :close_read
sin.close_read
else
sin.close unless sin.closed?
end
if sin.respond_to? :threads
sin.threads.each do |th| th.raise Aborted end
end
end
end
|
#call(env) ⇒ Object
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 224
def call(env)
if do_stream(env)
begin
client = env["rack.hijack"]
buffer = client.instance_variable_get('@buffer')
tcp_io = client.call
Log.low "Hijacking post data #{tcp_io}"
content_type = env["CONTENT_TYPE"]
tcp_merged_io = Misc.open_pipe do |sin| merge_chunks(tcp_io, sin, buffer) end
inputs, stream_input, filename, stream, boundary = get_inputs(content_type, tcp_merged_io)
workflow, task = parse_uri(env)
job = run_job(workflow, task, inputs, stream_input, stream, boundary, filename)
job_url = File.join("/", workflow.to_s, task, job.name)
raise "Job aborted" if job.aborted?
raise job.messages.last if job.error?
out_stream = TSV.get_stream job
begin
Log.high "Write response #{Misc.fingerprint tcp_io} "
tcp_io.write "HTTP/1.1 200\r\n"
tcp_io.write "Connection: close\r\n"
tcp_io.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n"
tcp_io.write "\r\n"
Log.high "Comsuming response #{Misc.fingerprint tcp_io}"
Misc.consume_stream(out_stream, false, tcp_io)
Log.high "Comsumed response #{Misc.fingerprint tcp_io}"
rescue Exception
Log.exception $!
end if out_stream
tcp_io.close unless tcp_io.closed?
Log.high "Closed io #{tcp_io}"
[-1, {}, []]
rescue Exception
Log.exception $!
tcp_io.write "HTTP/1.1 515\r\n"
tcp_io.write "Connection: close\r\n"
tcp_io.write "\r\n"
tcp_io.close_write
raise $!
end
else
Log.low "NOT Hijacking post data"
@app.call(env)
end
end
|
#copy_until_boundary(sin, sout, boundary) ⇒ Object
58
59
60
61
62
63
64
65
66
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 58
def copy_until_boundary(sin, sout, boundary)
last_line = nil
while line = sin.gets
break if line.include? boundary
sout.write last_line
last_line = line
end
sout.write last_line.strip unless last_line == EOL
end
|
#do_stream(env) ⇒ Object
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 206
def do_stream(env)
uri = env["REQUEST_URI"]
post = env["REQUEST_METHOD"]
return false unless post == "POST"
hijack = !!env["rack.hijack"]
return false unless hijack
content_type = env["CONTENT_TYPE"]
return false unless content_type and content_type.include? "Rbbt_Param_Stream"
encoding = env["HTTP_TRANSFER_ENCODING"]
return false unless encoding == "chunked"
true
end
|
68
69
70
71
72
73
74
75
76
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 68
def get_inputs(content_type, stream)
boundary = content_type.match(/boundary=([^\s;]*)/)[1]
stream_input = content_type.match(/stream=([^\s;]*)/)[1]
inputs, filename = read_normal_inputs(stream, boundary, stream_input)
IndiferentHash.setup(inputs)
[inputs, stream_input, filename, stream, boundary]
end
|
#merge_chunks(sin, sout, buffer) ⇒ Object
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 185
def merge_chunks(sin, sout, buffer)
if buffer.nil?
_merge_chunks(sin, sout)
else
ssin = Misc.open_pipe do |s|
begin
s << buffer
while c = sin.readpartial(Misc::BLOCK_SIZE)
s << c
end
rescue Aborted, IOError
rescue Exception
ensure
sin.close_read
s.close
end
end
_merge_chunks(ssin, sout)
end
end
|
#parse_uri(env) ⇒ Object
11
12
13
14
15
16
17
18
19
20
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 11
def parse_uri(env)
uri = env["REQUEST_URI"]
_n, workflow, task = uri.split("/")
workflow = begin
Kernel.const_get(workflow)
rescue
raise "Could not accept task for workflow: #{ workflow }"
end
[workflow, task]
end
|
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 22
def read_normal_inputs(io, boundary, stream_input)
inputs = {}
input_name = nil
variable_chunk = nil
filename = nil
while line = io.gets
line.chomp!
chunk_start = line == "--" + boundary
if chunk_start
if input_name
inputs[input_name] = variable_chunk
end
content_start = false
elsif content_start
if variable_chunk.empty?
variable_chunk << line
else
variable_chunk << "\n" << line
end
elsif line =~ /^Content.* name="([^\s;"]*)"/
input_name = $1
filename = line.match(/filename="([^"]+)"/)[1] if line =~ /filename/
elsif line.empty?
variable_chunk = ""
break if input_name == stream_input
content_start = true
end
end
[inputs, filename]
end
|
#run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil) ⇒ Object
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 78
def run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil)
name = inputs.delete "jobname"
task_parameters = prepare_job_inputs(workflow, task, inputs)
IndiferentHash.setup task_parameters
Misc.add_stream_filename(stream, filename) if filename
clean_stream = Misc.open_pipe do |sin|
begin
copy_until_boundary(stream, sin, boundary)
rescue
end
end
ConcurrentStream.setup(clean_stream, :filename => filename)
task_parameters[stream_input.to_sym] = clean_stream
task = task.to_sym
Log.low "Running streaming job #{[workflow, task] * "/" }: #{Misc.fingerprint task_parameters}"
job = workflow.job(task, name, task_parameters)
job.clean if job.aborted?
execution_type = case
when workflow.exec_exports.include?(task)
"exec"
when workflow.synchronous_exports.include?(task)
"synchronous"
when workflow.asynchronous_exports.include?(task)
"asynchronous"
else
raise "No known export type for #{ workflow } #{ task }. Accesses denied"
end
execution_type = "exec" if inputs["_cache_type"] == 'exec'
begin
case execution_type
when "exec", nil
job.exec(:stream)
when "sync", "synchronous", "async", "asynchronous"
if job.done? or job.started?
done_consumer = Thread.new do
Misc.consume_stream(stream, false)
end
job.join unless job.done?
else
job.run(:stream)
end
else
raise "Unknown execution_type: #{execution_type}"
end
rescue Aborted, Interrupt
job.abort
stream.write "HTTP/1.1 500\r\n"
stream.close_write
rescue Exception
job.exception $!
stream.write "HTTP/1.1 500\r\n"
stream.close_write
end
job
end
|