Module: Errands::Runner
- Defined in:
- lib/errands/runner.rb
Instance Attribute Summary collapse
-
#running_mode ⇒ Object
Returns the value of attribute running_mode.
Class Method Summary collapse
Instance Method Summary collapse
- #exit_on_stop ⇒ Object
- #run(options = startups) ⇒ Object
- #start(options = startups) ⇒ Object
- #started? ⇒ Boolean
- #starter(*_) ⇒ Object
- #starting(started) ⇒ Object
- #status ⇒ Object
- #stop(*_) ⇒ Object
- #stopped? ⇒ Boolean
- #wait_for(key, meth = nil, result = true) ⇒ Object
- #working(*_) ⇒ Object
Instance Attribute Details
#running_mode ⇒ Object
Returns the value of attribute running_mode.
182 183 184 |
# File 'lib/errands/runner.rb', line 182 def running_mode @running_mode end |
Class Method Details
.included(klass) ⇒ Object
177 178 179 180 |
# File 'lib/errands/runner.rb', line 177 def self.included(klass) klass.extend(ThreadAccessor).extend(Started) klass.thread_accessor :events, :receptors, :threads end |
Instance Method Details
#exit_on_stop ⇒ Object
224 225 226 227 |
# File 'lib/errands/runner.rb', line 224 def exit_on_stop stop exit end |
#run(options = startups) ⇒ Object
189 190 191 192 193 |
# File 'lib/errands/runner.rb', line 189 def run( = startups) start unless started? our.merge! events: receptors[:events] our[:threaded_run] || our[:noop_run] ? running { main_loop } : main_loop end |
#start(options = startups) ⇒ Object
184 185 186 187 |
# File 'lib/errands/runner.rb', line 184 def start( = startups) our_store! .merge(threads: Runners.new, receptors: Receptors.new) starter end |
#started? ⇒ Boolean
259 260 261 |
# File 'lib/errands/runner.rb', line 259 def started? !!our && !!threads && our[:started] = !stopped? end |
#starter(*_) ⇒ Object
195 196 197 198 199 200 201 |
# File 'lib/errands/runner.rb', line 195 def starter(*_) if our[:starter] self.class.started_workers *_ elsif !our[:noop_run] starting self.class.started_workers end end |
#starting(started) ⇒ Object
203 204 205 206 207 208 209 210 |
# File 'lib/errands/runner.rb', line 203 def starting(started) puts ["Starting #{self.class} with :", our[:config], "workers : #{started}", "\n"].join("\n") unless our[:quiet] running thread_name, loop: true, started: started, type: :starter do Array(my[:started]).uniq.each { |s| threads[s] ||= send *(respond_to?(s, true) ? [s] : [:working, s]) } sleep frequency || 1 end end |
#status ⇒ Object
240 241 242 |
# File 'lib/errands/runner.rb', line 240 def status {}.tap { |s| threads.each { |name, t| s[name] = t.status } } end |
#stop(*_) ⇒ Object
229 230 231 232 233 234 235 236 237 238 |
# File 'lib/errands/runner.rb', line 229 def stop(*_) [false, true].each do |all| list = threads.key_sliced(_.any? ? _ : stopped_threads) list.alive.each { |n, t| his(t)[:stop] = true } list.stopping_order(all).alive.each { |n, t| exiting(n, !all || t.stop?) } end stopped? threads.key_sliced(_.any? ? _ : stopped_threads).alive.empty? end |
#stopped? ⇒ Boolean
253 254 255 256 257 |
# File 'lib/errands/runner.rb', line 253 def stopped? our[:stopped] = threads.key_sliced(stopped_threads).alive.empty?.tap { |bool| log_activity Time.now, "All activities stopped" if bool } end |
#wait_for(key, meth = nil, result = true) ⇒ Object
244 245 246 247 248 249 250 251 |
# File 'lib/errands/runner.rb', line 244 def wait_for(key, meth = nil, result = true) time = Time.now.to_f loop { break if @errands_wait_timeout && Time.now.to_f - time > @errands_wait_timeout break if meth && our[key].respond_to?(meth, true) ? ((our[key].send(meth) == result) rescue nil) : !!our[key] == result } end |
#working(*_) ⇒ Object
212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/errands/runner.rb', line 212 def working(*_) work_done, processing, data_acquisition = working_jargon *_ our[work_done] = false running _.first, loop: true, type: :data_acquisition do unless my[:stop] ||= our[work_done] = checked_send("#{work_done}?") r = ready_receptor! processing ((r << send(data_acquisition)) && !my[:latency]) || sleep(frequency.to_i) end end end |