Module: Xolo::Server::Helpers::ProgressStreaming

Defined in:
lib/xolo/server/helpers/progress_streaming.rb

Overview

This is used both as a ‘helper’ in the Sinatra server, and an included mixin for the Xolo::Server::Title and Xolo::Server::Version classes to provide common methods for long-running routes that deliver realtime progress updates via http streaming.

Constant Summary collapse

PROGRESS_THREAD_NAME_PREFIX =

Constants

'xolo-progress-stream-'

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(includer) ⇒ Object

when this module is included



35
36
37
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 35

def self.included(includer)
  Xolo.verbose_include includer, self
end

Instance Method Details

#a_long_thing_with_streamed_feedbackObject

TEMP TESTING - Thisis happening in a thread and should send updates via #progress



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 197

def a_long_thing_with_streamed_feedback
  log_debug 'Starting a_long_thing_with_streamed_feedback'

  progress 'Doing a quick thing...'
  sleep 3
  progress 'Quick thing done.'

  progress "Doing a slow thing at #{Time.now}  ..."
  log_debug 'Starting thread in a_long_thing_with_streamed_feedback'

  # even tho we are in a thread, if we want to
  # send updates while some long sub-task is running
  # we do it in another thread like this
  long_thr = Thread.new { sleep 30 }
  sleep 3

  while long_thr.alive?
    log_debug 'Thread still alive...'
    progress "Slow thing still happening at #{Time.now} ..."
    sleep 3
  end

  log_debug 'Thread in a_long_thing_with_streamed_feedback is done'

  progress 'Slow thing done.'

  progress "Doing a medium thing at #{Time.now}  ..."
  log_debug 'Starting another thread in a_long_thing_with_streamed_feedback - sends output from the thread'

  # Doing this in a thead is just for academics...
  # as is doing anything in a thread and immediately doing thr.join
  med_thr = Thread.new do
    3.times do |x|
      progress "the medium thing has done #{x + 1} things", log: :debug
      sleep 5
    end
  end

  progress 'Now waiting for medium thing to finish...'
  med_thr.join

  progress 'Medium thing is done.'
end

#jsonify_stream_msg(msg) ⇒ Object

TMP



242
243
244
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 242

def jsonify_stream_msg(msg)
  @out_to_stream << { msg: msg }.to_json
end

#progress(msg, log: :nil) ⇒ void

This method returns an undefined value.

Append a message to the progress stream file, optionally sending it also to the log

Parameters:

  • message (String)

    the message to append

  • log (Symbol) (defaults to: :nil)

    the level at which to log the message one of :debug, :info, :warn, :error, :fatal, or :unknown. Default is nil, which doesn’t log the message at all.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 144

def progress(msg, log: :nil)
  # log_debug "Progress method called from #{caller_locations.first}"

  progress_stream_file.pix_append "#{msg.chomp}\n"

  unless log
    # log_debug 'Processed unlogged progress message'
    return
  end

  case log
  when :debug
    log_debug msg
  when :info
    log_info msg
  when :warn
    log_warn msg
  when :error
    log_error msg
  when :fatal
    log_fatal msg
  when :unknown
    log_unknown msg
  end
end

#progress_stream_fileObject

The file to which we write progess messages for long-running processes, which might in turn be streamed to xadm now or in the future



115
116
117
118
119
120
121
122
123
124
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 115

def progress_stream_file
  return @progress_stream_file if @progress_stream_file

  tempf = Tempfile.create "#{PROGRESS_THREAD_NAME_PREFIX}#{session[:xolo_id]}-"
  tempf.close # we'll write to it later
  log_debug "Created progress_stream_file: #{tempf.path}"
  @progress_stream_file = Pathname.new(tempf.path)
  @progress_stream_file.pix_touch
  @progress_stream_file
end

#progress_stream_url_pathObject

The file to which we write progess messages for long-running processes, which might in turn be streamed to xadm.



130
131
132
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 130

def progress_stream_url_path
  "/streamed_progress/?stream_file=#{CGI.escape progress_stream_file.to_s}"
end

#setup_progress_streamingObject

Setup for streaming: create the tmp file so that any threads will see it and do any other pre-streaming stuff

Parameters:

  • xadm_command (String)

    the xadm command that is causing this stream. may be used for finding a returning the progress file after the fact.



99
100
101
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 99

def setup_progress_streaming
  log_debug "Setting up for progress streaming. progress_stream_file is: #{progress_stream_file}"
end

#stop_progress_streamingObject

End che current progress stream



105
106
107
108
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 105

def stop_progress_streaming
  log_debug "Stopping progress streaming to file: #{progress_stream_file}"
  progress Xolo::Server::PROGRESS_COMPLETE
end

#stream_progress(stream_file:, stream:) ⇒ void

This method returns an undefined value.

Stream lines from the given file to the given stream

Parameters:

  • stream_file: (Pathname)

    the file to stream from

  • stream: (Sinatra::Helpers::Stream)

    the stream to send to



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 177

def stream_progress(stream_file:, stream:)
  log_debug "About to tail: usr/bin/tail -f -c +1 #{Shellwords.escape stream_file.to_s}"

  stdin, stdouterr, wait_thr = Open3.popen2e("/usr/bin/tail -f -c +1 #{Shellwords.escape stream_file.to_s}")
  stdin.close

  while line = stdouterr.gets
    break if line.chomp == Xolo::Server::PROGRESS_COMPLETE

    stream << line
  end
  stdouterr.close
  wait_thr.exit
  # TODO: deal with wait_thr.value.exitstatus > 0 ?
end

#with_streaming { ... } ⇒ Object

Call this from long-running routes.

It runs a block in a thread with streaming

The block should call #progress as needed to write to the progress file, and optionally the log

Always sends back a JSON response body with

status: :running,
progress_stream_url_path: progress_stream_url_path

Any errors should be written to the stream file, as will unhandled exceptions.

Yields:

  • The block to run in the thread with streaming



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/xolo/server/helpers/progress_streaming.rb', line 60

def with_streaming
  raise 'No block given to run in streaming thread' unless block_given?

  @streaming_now = true

  # always call this first in a
  # long-running route that will use progress streaming
  setup_progress_streaming

  log_debug 'Starting with_streaming block in thread'

  @streaming_thread = Thread.new do
    log_debug 'Thread with_streaming is starting and yielding to block'
    yield
    log_debug 'Thread with_streaming is finished'
  rescue StandardError => e
    progress "ERROR: #{e.class}: #{e}", log: :error
    e.backtrace.each { |l| log_debug "..#{l}" }
  ensure
    stop_progress_streaming
  end

  @streaming_thread.name = "#{PROGRESS_THREAD_NAME_PREFIX}#{session[:xolo_id]}"

  resp_body = {
    status: :running,
    progress_stream_url_path: progress_stream_url_path
  }
  body resp_body
end