Module: Xolo::Server::Helpers::ProgressStreaming
- Defined in:
- lib/xolo/server/helpers/progress_streaming.rb
Overview
This is used both as a ‘helper’ in the Sinatra server, and an included mixin for the Xolo::Server::Title and Xolo::Server::Version classes to provide common methods for long-running routes that deliver realtime progress updates via http streaming.
Constant Summary collapse
- PROGRESS_THREAD_NAME_PREFIX =
Constants
'xolo-progress-stream-'
Class Method Summary collapse
-
.included(includer) ⇒ Object
when this module is included.
Instance Method Summary collapse
-
#a_long_thing_with_streamed_feedback ⇒ Object
TEMP TESTING - Thisis happening in a thread and should send updates via #progress.
-
#jsonify_stream_msg(msg) ⇒ Object
TMP.
-
#progress(msg, log: :nil) ⇒ void
Append a message to the progress stream file, optionally sending it also to the log.
-
#progress_stream_file ⇒ Object
The file to which we write progess messages for long-running processes, which might in turn be streamed to xadm now or in the future.
-
#progress_stream_url_path ⇒ Object
The file to which we write progess messages for long-running processes, which might in turn be streamed to xadm.
-
#setup_progress_streaming ⇒ Object
Setup for streaming: create the tmp file so that any threads will see it and do any other pre-streaming stuff.
-
#stop_progress_streaming ⇒ Object
End che current progress stream.
-
#stream_progress(stream_file:, stream:) ⇒ void
Stream lines from the given file to the given stream.
-
#with_streaming { ... } ⇒ Object
Call this from long-running routes.
Class Method Details
.included(includer) ⇒ Object
when this module is included
35 36 37 |
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 35 def self.included(includer) Xolo.verbose_include includer, self end |
Instance Method Details
#a_long_thing_with_streamed_feedback ⇒ Object
TEMP TESTING - Thisis happening in a thread and should send updates via #progress
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 197 def a_long_thing_with_streamed_feedback log_debug 'Starting a_long_thing_with_streamed_feedback' progress 'Doing a quick thing...' sleep 3 progress 'Quick thing done.' progress "Doing a slow thing at #{Time.now} ..." log_debug 'Starting thread in a_long_thing_with_streamed_feedback' # even tho we are in a thread, if we want to # send updates while some long sub-task is running # we do it in another thread like this long_thr = Thread.new { sleep 30 } sleep 3 while long_thr.alive? log_debug 'Thread still alive...' progress "Slow thing still happening at #{Time.now} ..." sleep 3 end log_debug 'Thread in a_long_thing_with_streamed_feedback is done' progress 'Slow thing done.' progress "Doing a medium thing at #{Time.now} ..." log_debug 'Starting another thread in a_long_thing_with_streamed_feedback - sends output from the thread' # Doing this in a thead is just for academics... # as is doing anything in a thread and immediately doing thr.join med_thr = Thread.new do 3.times do |x| progress "the medium thing has done #{x + 1} things", log: :debug sleep 5 end end progress 'Now waiting for medium thing to finish...' med_thr.join progress 'Medium thing is done.' end |
#jsonify_stream_msg(msg) ⇒ Object
TMP
242 243 244 |
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 242 def jsonify_stream_msg(msg) @out_to_stream << { msg: msg }.to_json end |
#progress(msg, log: :nil) ⇒ void
This method returns an undefined value.
Append a message to the progress stream file, optionally sending it also to the log
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 144 def progress(msg, log: :nil) # log_debug "Progress method called from #{caller_locations.first}" progress_stream_file.pix_append "#{msg.chomp}\n" unless log # log_debug 'Processed unlogged progress message' return end case log when :debug log_debug msg when :info log_info msg when :warn log_warn msg when :error log_error msg when :fatal log_fatal msg when :unknown log_unknown msg end end |
#progress_stream_file ⇒ Object
The file to which we write progess messages for long-running processes, which might in turn be streamed to xadm now or in the future
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 115 def progress_stream_file return @progress_stream_file if @progress_stream_file tempf = Tempfile.create "#{PROGRESS_THREAD_NAME_PREFIX}#{session[:xolo_id]}-" tempf.close # we'll write to it later log_debug "Created progress_stream_file: #{tempf.path}" @progress_stream_file = Pathname.new(tempf.path) @progress_stream_file.pix_touch @progress_stream_file end |
#progress_stream_url_path ⇒ Object
The file to which we write progess messages for long-running processes, which might in turn be streamed to xadm.
130 131 132 |
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 130 def progress_stream_url_path "/streamed_progress/?stream_file=#{CGI.escape progress_stream_file.to_s}" end |
#setup_progress_streaming ⇒ Object
Setup for streaming: create the tmp file so that any threads will see it and do any other pre-streaming stuff
99 100 101 |
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 99 def setup_progress_streaming log_debug "Setting up for progress streaming. progress_stream_file is: #{progress_stream_file}" end |
#stop_progress_streaming ⇒ Object
End che current progress stream
105 106 107 108 |
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 105 def stop_progress_streaming log_debug "Stopping progress streaming to file: #{progress_stream_file}" progress Xolo::Server::PROGRESS_COMPLETE end |
#stream_progress(stream_file:, stream:) ⇒ void
This method returns an undefined value.
Stream lines from the given file to the given stream
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 177 def stream_progress(stream_file:, stream:) log_debug "About to tail: usr/bin/tail -f -c +1 #{Shellwords.escape stream_file.to_s}" stdin, stdouterr, wait_thr = Open3.popen2e("/usr/bin/tail -f -c +1 #{Shellwords.escape stream_file.to_s}") stdin.close while line = stdouterr.gets break if line.chomp == Xolo::Server::PROGRESS_COMPLETE stream << line end stdouterr.close wait_thr.exit # TODO: deal with wait_thr.value.exitstatus > 0 ? end |
#with_streaming { ... } ⇒ Object
Call this from long-running routes.
It runs a block in a thread with streaming
The block should call #progress as needed to write to the progress file, and optionally the log
Always sends back a JSON response body with
status: :running,
progress_stream_url_path: progress_stream_url_path
Any errors should be written to the stream file, as will unhandled exceptions.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 60 def with_streaming raise 'No block given to run in streaming thread' unless block_given? @streaming_now = true # always call this first in a # long-running route that will use progress streaming setup_progress_streaming log_debug 'Starting with_streaming block in thread' @streaming_thread = Thread.new do log_debug 'Thread with_streaming is starting and yielding to block' yield log_debug 'Thread with_streaming is finished' rescue StandardError => e progress "ERROR: #{e.class}: #{e}", log: :error e.backtrace.each { |l| log_debug "..#{l}" } ensure stop_progress_streaming end @streaming_thread.name = "#{PROGRESS_THREAD_NAME_PREFIX}#{session[:xolo_id]}" resp_body = { status: :running, progress_stream_url_path: progress_stream_url_path } body resp_body end |