Class: StreamWorkflowTask

Inherits:
Object
  • Object
show all
Includes:
RbbtRESTHelpers, WorkflowRESTHelpers
Defined in:
lib/rbbt/rest/workflow/stream_task.rb

Constant Summary collapse

EOL =
"\r\n"

Constants included from RbbtRESTHelpers

RbbtRESTHelpers::MEMORY_CACHE, RbbtRESTHelpers::PAGE_SIZE

Instance Attribute Summary

Attributes included from RbbtRESTHelpers

#javascript_resources, #plugin_resources, #sass_resources, #template_resources

Attributes included from WorkflowRESTHelpers

#workflow_resources

Instance Method Summary collapse

Methods included from RbbtRESTHelpers

#add_GET_param, add_sass_load_path, #cache, #check_step, #consume_parameter, css_resources, #css_resources, #development?, #error_for, #file_or_text_area, #file_resources, file_resources, #filter, #fix_input, #form_input, #fragment, #hash2dl, #header, #html_tag, #input_label, javascript_resources, #link_css, #link_js, load_tsv, #load_tsv, #locate_css, #locate_css_from_resource, #locate_file, #locate_file_from_resource, #locate_javascript, #locate_javascript_from_resource, #locate_sass, #locate_sass_from_resource, #locate_template, #locate_template_from_resource, #log, #modal_fragment, #old_cache, #paginate, #param2boolean, #parse_page, #partial_render, #permalink, #prepare_input, #process_common_parameters, #production?, #record_css, #record_js, #recorded_css_files, #recorded_js_files, #remove_GET_param, #render, #render_partial, #render_sass, #reset_js_css, #resource, #reveal, sass_resources, save_tsv, #save_tsv, #serve_css, #serve_js, #table, #table_value, #template_render, template_resources, #tsv2html, #tsv_process, #tsv_rows, #tsv_rows_full, #wait_on

Methods included from WorkflowRESTHelpers

#clean_job, #complete_input_set, #consume_task_parameters, #execution_type, #issue_job, #locate_workflow_template, #locate_workflow_template_from_resource, #prepare_job_inputs, #recursive_clean_job, #show_exec_result, #show_result, #show_result_html, #stream_job, #type_of_export, #workflow_partial, #workflow_render, workflow_resources

Constructor Details

#initialize(app) ⇒ StreamWorkflowTask

Returns a new instance of StreamWorkflowTask.



5
6
7
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 5

def initialize(app)
  @app = app
end

Instance Method Details

#_merge_chunks(sin, sout) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 147

def _merge_chunks(sin, sout)

  begin
    while true
      chunk_size_str = ""
      stop = false
      while chunk_size_str.strip.empty? 
        chunk_size_str = sin.gets
        raise "Empty chunk size" if chunk_size_str.nil? or chunk_size_str.strip.empty?
        chunk_size_str = "" if chunk_size_str.nil?
      end
      break if stop
      size = chunk_size_str.strip.to_i(16)
      break if size == 0 
      chunk = sin.read(size)
      bound = sin.read(2)
      raise "bound not right: #{ bound }" if bound != EOL
      raise "Size does not match: #{[chunk.length, size] * " != "}" if chunk.length != size
      sout.write chunk
    end
  rescue Aborted
    raise $!
  rescue Exception
    Log.exception $!
    raise $!
  ensure
    if sin.respond_to? :close_read
      sin.close_read 
    else
      sin.close unless sin.closed?
    end
    if sin.respond_to? :threads
      sin.threads.each do |th| th.raise Aborted end
    end

  end
end

#call(env) ⇒ Object



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 224

def call(env)

  if do_stream(env)
    begin

      client = env["rack.hijack"]
      buffer = client.instance_variable_get('@buffer')
      tcp_io = client.call
      Log.low "Hijacking post data #{tcp_io}"

      content_type = env["CONTENT_TYPE"]

      tcp_merged_io = Misc.open_pipe do |sin| merge_chunks(tcp_io, sin, buffer) end

      inputs, stream_input, filename, stream, boundary = get_inputs(content_type, tcp_merged_io)

      workflow, task = parse_uri(env)

      job = run_job(workflow, task, inputs, stream_input, stream, boundary, filename)

      job_url = File.join("/", workflow.to_s, task, job.name)

      raise "Job aborted" if job.aborted?
      raise job.messages.last if job.error?

      out_stream = TSV.get_stream job

      begin
        Log.high "Write response #{Misc.fingerprint tcp_io} "
        tcp_io.write "HTTP/1.1 200\r\n"
        tcp_io.write "Connection: close\r\n"
        tcp_io.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n"
        tcp_io.write "\r\n"
        Log.high "Comsuming response #{Misc.fingerprint tcp_io}"
        Misc.consume_stream(out_stream, false, tcp_io)
        Log.high "Comsumed response #{Misc.fingerprint tcp_io}"
      rescue Exception
        Log.exception $!
      end if out_stream

      tcp_io.close unless tcp_io.closed?
      Log.high "Closed io #{tcp_io}"

      [-1, {}, []]
    rescue Exception
      Log.exception $!
      tcp_io.write "HTTP/1.1 515\r\n"
      tcp_io.write "Connection: close\r\n"
      tcp_io.write "\r\n"
      tcp_io.close_write
      raise $!
    end
  else
    Log.low "NOT Hijacking post data"

    @app.call(env)
  end
end

#copy_until_boundary(sin, sout, boundary) ⇒ Object



58
59
60
61
62
63
64
65
66
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 58

def copy_until_boundary(sin, sout, boundary)
  last_line = nil
  while line = sin.gets
    break if line.include? boundary
    sout.write last_line
    last_line = line
  end
  sout.write last_line.strip unless last_line == EOL
end

#do_stream(env) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 206

def do_stream(env)
  uri = env["REQUEST_URI"]

  post = env["REQUEST_METHOD"]
  return false unless post == "POST"

  hijack = !!env["rack.hijack"]
  return false unless hijack

  content_type = env["CONTENT_TYPE"] 
  return false unless content_type and content_type.include? "Rbbt_Param_Stream"

  encoding = env["HTTP_TRANSFER_ENCODING"] 
  return false unless encoding == "chunked"

  true
end

#get_inputs(content_type, stream) ⇒ Object



68
69
70
71
72
73
74
75
76
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 68

def get_inputs(content_type, stream)
  boundary = content_type.match(/boundary=([^\s;]*)/)[1]
  stream_input = content_type.match(/stream=([^\s;]*)/)[1]
  inputs, filename = read_normal_inputs(stream, boundary, stream_input)

  IndiferentHash.setup(inputs)

  [inputs, stream_input, filename, stream, boundary]
end

#merge_chunks(sin, sout, buffer) ⇒ Object



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 185

def merge_chunks(sin, sout, buffer)
  if buffer.nil?
    _merge_chunks(sin, sout)
  else
    ssin = Misc.open_pipe do |s|
      begin
        s <<  buffer
        while c = sin.readpartial(Misc::BLOCK_SIZE)
          s << c
        end
      rescue Aborted, IOError
      rescue Exception
      ensure
        sin.close_read
        s.close
      end
    end
    _merge_chunks(ssin, sout)
  end
end

#parse_uri(env) ⇒ Object



11
12
13
14
15
16
17
18
19
20
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 11

def parse_uri(env)
  uri = env["REQUEST_URI"]
  _n, workflow, task = uri.split("/")
  workflow = begin
               Kernel.const_get(workflow)
             rescue
               raise "Could not accept task for workflow: #{ workflow }"
             end
  [workflow, task]
end

#read_normal_inputs(io, boundary, stream_input) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 22

def read_normal_inputs(io, boundary, stream_input)
  inputs = {}

  input_name = nil
  variable_chunk = nil
  filename = nil

  while line = io.gets
    line.chomp!

    chunk_start = line == "--" + boundary

    if chunk_start
      if input_name
        inputs[input_name] = variable_chunk
      end
      content_start = false
    elsif content_start  
      if variable_chunk.empty?
        variable_chunk << line
      else
        variable_chunk << "\n" << line
      end
    elsif line =~ /^Content.* name="([^\s;"]*)"/
      input_name = $1
      filename = line.match(/filename="([^"]+)"/)[1] if line =~ /filename/
    elsif line.empty?
      variable_chunk = ""
      break if input_name == stream_input
      content_start = true
    end
  end
  
  [inputs, filename]
end

#run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 78

def run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil)
  name = inputs.delete "jobname"

  task_parameters = prepare_job_inputs(workflow, task, inputs)
  IndiferentHash.setup task_parameters

  Misc.add_stream_filename(stream, filename) if filename

  clean_stream = Misc.open_pipe do |sin|
    begin
      copy_until_boundary(stream, sin, boundary)
    rescue
    end
  end

  ConcurrentStream.setup(clean_stream, :filename => filename)

  task_parameters[stream_input.to_sym] = clean_stream

  task = task.to_sym

  Log.low "Running streaming job #{[workflow, task] * "/" }: #{Misc.fingerprint task_parameters}"
  job = workflow.job(task, name, task_parameters)
  
  job.clean if job.aborted?

  execution_type = case
                   when workflow.exec_exports.include?(task)
                     "exec"
                   when workflow.synchronous_exports.include?(task)
                     "synchronous"
                   when workflow.asynchronous_exports.include?(task)
                     "asynchronous"
                   else
                     raise "No known export type for #{ workflow } #{ task }. Accesses denied"
                   end 

  execution_type = "exec" if inputs["_cache_type"] == 'exec'

  begin
    case execution_type
    when "exec", nil
      job.exec(:stream)
    when "sync", "synchronous", "async", "asynchronous"
      if job.done? or job.started?
        done_consumer = Thread.new do
          Misc.consume_stream(stream, false)
        end
        job.join unless job.done?
      else
        job.run(:stream)
      end
    else
      raise "Unknown execution_type: #{execution_type}"
    end

  rescue Aborted, Interrupt
    job.abort
    stream.write "HTTP/1.1 500\r\n"
    stream.close_write
  rescue Exception
    job.exception $!
    stream.write "HTTP/1.1 500\r\n"
    stream.close_write
  end

  job
end