Class: StreamWorkflowTask

Inherits:
Object
  • Object
show all
Includes:
RbbtRESTHelpers, WorkflowRESTHelpers
Defined in:
lib/rbbt/rest/workflow/stream_task.rb

Constant Summary collapse

EOL =
"\r\n"

Constants included from RbbtRESTHelpers

RbbtRESTHelpers::MEMORY_CACHE, RbbtRESTHelpers::PAGE_SIZE

Instance Attribute Summary

Attributes included from RbbtRESTHelpers

#javascript_resources, #plugin_resources, #sass_resources, #template_resources

Attributes included from WorkflowRESTHelpers

#workflow_resources

Instance Method Summary collapse

Methods included from RbbtRESTHelpers

#add_GET_param, add_sass_load_path, #add_search_paths, #cache, #check_step, #consume_parameter, css_resources, #css_resources, #development?, #error_for, #escape_url, #file_or_text_area, #file_resources, file_resources, #filter, #find_all, #find_all_server_files, #fix_html, #fix_input, #form_input, #fragment, #glob_all, #glob_all_server_files, #halt_html, #hash2dl, #header, #html_tag, #input_label, javascript_resources, #json_resource, #link_css, #link_js, load_tsv, #load_tsv, #locate_css, #locate_file, #locate_javascript, #locate_sass, #locate_server_file, #locate_template, #log, #modal_fragment, #old_cache, #paginate, #param2boolean, #parse_page, #partial_render, #permalink, #prepare_input, #process_common_parameters, #production?, #progress, #record_css, #record_js, #recorded_css_files, #recorded_js_files, #remove_GET_param, #render, #render_partial, #render_sass, #reset_js_css, #resource, #reveal, sass_resources, #save_tsv, save_tsv, #serve_css, #serve_js, #sync_json_resources, #table, #table_value, #tabs, #tar_file, #template_render, template_resources, #traverse, #tsv2html, #tsv_process, #tsv_rows, #tsv_rows_full, #wait_on

Methods included from WorkflowRESTHelpers

#abort_job, #clean_job, #complete_input_set, #consume_task_parameters, #execution_type, #issue_job, #locate_workflow_template, #prepare_job_inputs, #recursive_clean_job, #show_exec_result, #show_result, #show_result_html, #stream_job, #type_of_export, #workflow_partial, #workflow_render, workflow_resources

Constructor Details

#initialize(app) ⇒ StreamWorkflowTask

Returns a new instance of StreamWorkflowTask.



5
6
7
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 5

def initialize(app)
  @app = app
end

Instance Method Details

#_merge_chunks(sin, sout) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 139

def _merge_chunks(sin, sout)

  begin
    while true
      chunk_size_str = ""
      stop = false
      while chunk_size_str.strip.empty? 
        chunk_size_str = sin.gets
        raise "Empty chunk size" if chunk_size_str.nil? or chunk_size_str.strip.empty?
        chunk_size_str = "" if chunk_size_str.nil?
      end
      break if stop
      size = chunk_size_str.strip.to_i(16)
      break if size == 0 
      chunk = sin.read(size)
      bound = sin.read(2)
      raise "bound not right: #{ bound }" if bound != EOL
      raise "Size does not match: #{[chunk.length, size] * " != "}" if chunk.length != size
      sout.write chunk
    end
  rescue Aborted
    raise $!
  rescue StandardError
    Log.exception $!
    raise $!
  ensure
    #if sin.respond_to? :close_read
    #  sin.close_read 
    #else
    #  sin.close unless sin.closed?
    #end
    #if sin.respond_to? :threads
    #  sin.threads.each do |th| th.raise Aborted end
    #end

  end
end

#call(env) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 215

def call(env)

  if do_stream(env)
    begin


      client = env["rack.hijack"]
      buffer = client.instance_variable_get('@buffer')
      tcp_io = client.call

      Log.low "Hijacking post data #{tcp_io}"
      content_type = env["CONTENT_TYPE"]
      encoding = env["HTTP_TRANSFER_ENCODING"] 

      if env["rack.input"]
        tcp_merged_io = Misc.open_pipe do |sin|
          rinput = env["rack.input"]
          sin << rinput.instance_variable_get("@rbuf")
          while c = rinput.gets
            sin.puts c
          end
        end
      else
        if encoding == "chunked"
          Log.low "Merging chunks #{tcp_io}"
          tcp_merged_io = Misc.open_pipe do |sin|
            begin
              merge_chunks(tcp_io, sin, buffer); 
            rescue StandardError
            ensure
              begin
                tcp_io.close_read;
              rescue
              end
            end
          end
        else
          tcp_merged_io = tcp_io
        end
      end

      #tcp_merged_io = Misc.log_stream(tcp_merged_io)

      inputs, stream_input, filename, stream, boundary = get_inputs(content_type, tcp_merged_io)

      workflow, task = parse_uri(env)

      job = run_job(workflow, task, inputs, stream_input, stream, boundary, filename)

      job_url = File.join("/", workflow.to_s, task, job.name)

      raise "Job aborted" if job.aborted?
      raise job.messages.last if job.error?

      out_stream = TSV.get_stream job

      begin
        Log.high "Write response #{Misc.fingerprint tcp_io} "
        tcp_io.write "HTTP/1.1 200\r\n"
        tcp_io.write "Connection: close\r\n"
        tcp_io.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n"
        tcp_io.write "\r\n"
        Log.high "Comsuming response #{Misc.fingerprint tcp_io}"
        begin
          while l = out_stream.readpartial(2048)
            tcp_io.write l
          end
        rescue EOFError
        end
        Log.high "Comsumed response #{Misc.fingerprint tcp_io}"
        out_stream.join if out_stream.respond_to? :join
      rescue Exception
        Log.exception $!
        raise $!
      end if out_stream

      tcp_io.close_write unless tcp_io.closed?
      Log.high "Closed io #{tcp_io}"

      [-1, {}, []]
    rescue Exception
      Log.exception $!
      job.exception $! if job
      tcp_io.write "HTTP/1.1 500\r\n"
      tcp_io.write "Connection: close\r\n"
      tcp_io.write "\r\n"
      tcp_io.close_write
      raise $!
    end
  else
    Log.low "NOT Hijacking post data"

    @app.call(env)
  end
end

#copy_until_boundary(sin, sout, boundary) ⇒ Object



58
59
60
61
62
63
64
65
66
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 58

def copy_until_boundary(sin, sout, boundary)
  last_line = nil
  while line = sin.gets
    break if line.include? boundary
    sout.write last_line
    last_line = line
  end
  sout.write last_line.strip unless last_line.nil? or last_line == EOL
end

#do_stream(env) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 197

def do_stream(env)
  uri = @uri

  post = @request_method
  return false unless post == "POST"

  hijack = !!env["rack.hijack"]
  return false unless hijack

  content_type = env["CONTENT_TYPE"] 
  return false unless content_type and content_type.include? "Rbbt_Param_Stream"

  encoding = env["HTTP_TRANSFER_ENCODING"] 
  return false unless encoding.nil? or encoding == "chunked"

  true
end

#get_inputs(content_type, stream) ⇒ Object



68
69
70
71
72
73
74
75
76
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 68

def get_inputs(content_type, stream)
  boundary = content_type.match(/boundary=([^\s;]*)/)[1]
  stream_input = content_type.match(/stream=([^\s;]*)/)[1]
  inputs, filename = read_normal_inputs(stream, boundary, stream_input)

  IndiferentHash.setup(inputs)

  [inputs, stream_input, filename, stream, boundary]
end

#merge_chunks(sin, sout, buffer) ⇒ Object



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 177

def merge_chunks(sin, sout, buffer)
  if buffer.nil?
    _merge_chunks(sin, sout)
  else
    ssin = Misc.open_pipe do |s|
      begin
        s <<  buffer
        while c = sin.readpartial(Misc::BLOCK_SIZE)
          s << c
        end
      rescue Aborted, IOError
      rescue Exception
      ensure
        s.close
      end
    end
    _merge_chunks(ssin, sout)
  end
end

#parse_uri(env) ⇒ Object



11
12
13
14
15
16
17
18
19
20
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 11

def parse_uri(env)
  uri = @uri
  _n, workflow, task = uri.split("/")
  workflow = begin
               Kernel.const_get(workflow)
             rescue
               raise "Could not accept task for workflow: #{ workflow }"
             end
  [workflow, task]
end

#read_normal_inputs(io, boundary, stream_input) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 22

def read_normal_inputs(io, boundary, stream_input)
  inputs = {}

  input_name = nil
  variable_chunk = nil
  filename = nil

  while line = io.gets
    line.chomp!

    chunk_start = line == "--" + boundary

    if chunk_start
      if input_name
        inputs[input_name] = variable_chunk
      end
      content_start = false
    elsif content_start  
      if variable_chunk.empty?
        variable_chunk << line
      else
        variable_chunk << "\n" << line
      end
    elsif line =~ /^Content.* name="([^\s;"]*)"/
      input_name = $1
      filename = line.match(/filename="([^"]+)"/)[1] if line =~ /filename/
    elsif line.empty?
      variable_chunk = ""
      break if input_name == stream_input
      content_start = true
    end
  end
  
  [inputs, filename]
end

#run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 78

def run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil)
  name = inputs.delete "jobname"

  task_parameters = prepare_job_inputs(workflow, task, inputs)
  IndiferentHash.setup task_parameters

  Misc.add_stream_filename(stream, filename) if filename

  clean_stream = Misc.open_pipe do |sin|
    begin
      copy_until_boundary(stream, sin, boundary)
    rescue
      Log.exception $!
    end
  end

  ConcurrentStream.setup(clean_stream, :filename => filename)

  task_parameters[stream_input.to_sym] = clean_stream

  task = task.to_sym

  Log.low "Running streaming job #{[workflow, task] * "/" }: #{Misc.fingerprint task_parameters}"
  job = workflow.job(task, name, task_parameters)
  
  job.clean if job.aborted?

  execution_type = type_of_export(workflow, task)

  execution_type = "exec" if inputs["_cache_type"] == 'exec'

  begin
    case execution_type.to_s
    when "exec", nil
      job.exec(:stream)
    when "sync", "synchronous", "async", "asynchronous"
      if job.done? or job.started?
        done_consumer = Thread.new do
          Misc.consume_stream(clean_stream)
        end
        job.join unless job.done?
      else
        job.run(:stream)
      end
    else
      raise "Unknown execution_type: #{Misc.inspect execution_type}"
    end

  rescue Aborted, Interrupt
    job.abort
    stream.write "HTTP/1.1 500\r\n"
    stream.close_write
  rescue Exception
    job.exception $!
    stream.write "HTTP/1.1 500\r\n"
    stream.close_write
  end

  job
end