Module: Errands::Runner

Defined in:
lib/errands/runner.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#running_modeObject

Returns the value of attribute running_mode.



182
183
184
# File 'lib/errands/runner.rb', line 182

def running_mode
  @running_mode
end

Class Method Details

.included(klass) ⇒ Object



177
178
179
180
# File 'lib/errands/runner.rb', line 177

def self.included(klass)
  klass.extend(ThreadAccessor).extend(Started)
  klass.thread_accessor :events, :receptors, :threads
end

Instance Method Details

#exit_on_stopObject



224
225
226
227
# File 'lib/errands/runner.rb', line 224

def exit_on_stop
  stop
  exit
end

#run(options = startups) ⇒ Object



189
190
191
192
193
# File 'lib/errands/runner.rb', line 189

def run(options = startups)
  start options unless started?
  our.merge! events: receptors[:events]
  our[:threaded_run] || our[:noop_run] ? running { main_loop } : main_loop
end

#start(options = startups) ⇒ Object



184
185
186
187
# File 'lib/errands/runner.rb', line 184

def start(options = startups)
  our_store! options.merge(threads: Runners.new, receptors: Receptors.new)
  starter
end

#started?Boolean

Returns:

  • (Boolean)


259
260
261
# File 'lib/errands/runner.rb', line 259

def started?
  !!our && !!threads && our[:started] = !stopped?
end

#starter(*_) ⇒ Object



195
196
197
198
199
200
201
# File 'lib/errands/runner.rb', line 195

def starter(*_)
  if our[:starter]
    self.class.started_workers *_
  elsif !our[:noop_run]
    starting self.class.started_workers
  end
end

#starting(started) ⇒ Object



203
204
205
206
207
208
209
210
# File 'lib/errands/runner.rb', line 203

def starting(started)
  puts ["Starting #{self.class} with :", our[:config], "workers : #{started}", "\n"].join("\n") unless our[:quiet]

  running thread_name, loop: true, started: started, type: :starter do
    Array(my[:started]).uniq.each { |s| threads[s] ||= send *(respond_to?(s, true) ? [s] : [:working, s]) }
    sleep frequency || 1
  end
end

#statusObject



240
241
242
# File 'lib/errands/runner.rb', line 240

def status
  {}.tap { |s| threads.each { |name, t| s[name] = t.status } }
end

#stop(*_) ⇒ Object



229
230
231
232
233
234
235
236
237
238
# File 'lib/errands/runner.rb', line 229

def stop(*_)
  [false, true].each do |all|
    list = threads.key_sliced(_.any? ? _ : stopped_threads)
    list.alive.each { |n, t| his(t)[:stop] = true }
    list.stopping_order(all).alive.each { |n, t| exiting(n, !all || t.stop?) }
  end

  stopped?
  threads.key_sliced(_.any? ? _ : stopped_threads).alive.empty?
end

#stopped?Boolean

Returns:

  • (Boolean)


253
254
255
256
257
# File 'lib/errands/runner.rb', line 253

def stopped?
  our[:stopped] = threads.key_sliced(stopped_threads).alive.empty?.tap { |bool|
    log_activity Time.now, "All activities stopped" if bool
  }
end

#wait_for(key, meth = nil, result = true) ⇒ Object



244
245
246
247
248
249
250
251
# File 'lib/errands/runner.rb', line 244

def wait_for(key, meth = nil, result = true)
  time = Time.now.to_f
  loop {
    break if @errands_wait_timeout && Time.now.to_f - time > @errands_wait_timeout
    break if meth && our[key].respond_to?(meth, true) ?
      ((our[key].send(meth) == result) rescue nil) :
      !!our[key] == result }
end

#working(*_) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
# File 'lib/errands/runner.rb', line 212

def working(*_)
  work_done, processing, data_acquisition = working_jargon *_
  our[work_done] = false

  running _.first, loop: true, type: :data_acquisition do
    unless my[:stop] ||= our[work_done] = checked_send("#{work_done}?")
      r = ready_receptor! processing
      ((r << send(data_acquisition)) && !my[:latency]) || sleep(frequency.to_i)
    end
  end
end