Class: Fasten::Runner

Inherits:
Object
  • Object
show all
Includes:
Support::Logger, Support::State, Support::Stats, Support::UI, Support::Yaml
Defined in:
lib/fasten/runner.rb

Overview

rubocop:disable Metrics/ClassLength

Constant Summary

Constants included from Support::Stats

Support::Stats::FLOAT_FORMATTER

Instance Attribute Summary collapse

Attributes included from Support::State

#dif, #error, #fin, #ini, #runner, #state

Attributes included from Support::Logger

#log_file, #logger

Instance Method Summary collapse

Methods included from Support::Yaml

#load_yaml, #save_yaml

Methods included from Support::UI

#run_ui, #ui

Methods included from Support::Stats

#hformat, #initialize_stats, #load_stats, #save_stats, #split_time, #stats_add_entry, #stats_create_entry, #stats_data, #stats_entries, #stats_last, #stats_run_history, #stats_summary, #stats_summary_data, #update_stats

Methods included from Support::State

#deps, #deps_str, #deps_str_runner, #deps_str_task, #idle?, #last_avg, #last_err, #last_stat, #paused?, #pausing?, #quitting?, #running?

Methods included from Support::Logger

#close_logger, #initialize_logger, #log_fin, #log_ini

Constructor Details

#initialize(**options) ⇒ Runner

Returns a new instance of Runner.



23
24
25
26
27
28
29
30
31
32
# File 'lib/fasten/runner.rb', line 23

def initialize(**options)
  i[name stats summary jobs worker_class fasten_dir use_threads ui_mode developer priority].each do |key|
    options[key] = Fasten.send "default_#{key}" unless options.key? key
  end

  @tasks = TaskManager.new(targets: options[:targets] || [], runner: self)
  @workers = []

  reconfigure(**options)
end

Instance Attribute Details

#developerObject

Returns the value of attribute developer.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def developer
  @developer
end

#fasten_dirObject

Returns the value of attribute fasten_dir.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def fasten_dir
  @fasten_dir
end

#jobsObject

Returns the value of attribute jobs.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def jobs
  @jobs
end

#nameObject

Returns the value of attribute name.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def name
  @name
end

#priorityObject

Returns the value of attribute priority.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def priority
  @priority
end

#queueObject

Returns the value of attribute queue.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def queue
  @queue
end

#statsObject

Returns the value of attribute stats.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def stats
  @stats
end

#summaryObject

Returns the value of attribute summary.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def summary
  @summary
end

#tasksObject

Returns the value of attribute tasks.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def tasks
  @tasks
end

#ui_modeObject

Returns the value of attribute ui_mode.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def ui_mode
  @ui_mode
end

#use_threadsObject

Returns the value of attribute use_threads.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def use_threads
  @use_threads
end

#worker_classObject

Returns the value of attribute worker_class.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def worker_class
  @worker_class
end

#workersObject

Returns the value of attribute workers.



21
22
23
# File 'lib/fasten/runner.rb', line 21

def workers
  @workers
end

Instance Method Details

#check_stateObject



122
123
124
125
126
127
128
129
130
131
# File 'lib/fasten/runner.rb', line 122

def check_state
  if state == :PAUSING && tasks.no_running?
    self.state = :PAUSED
    ui.message = nil
    ui.force_clear
  elsif state == :QUITTING && tasks.no_running?
    self.state = :QUIT
    ui.force_clear
  end
end

#dispatch_pending_tasksObject



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/fasten/runner.rb', line 252

def dispatch_pending_tasks
  while tasks.waiting? && tasks.running.map(&:weight).sum < jobs
    task = tasks.next

    task_worker_class = task.worker_class || worker_class
    task_worker_class = Object.const_get(task_worker_class) if task_worker_class.is_a? String

    worker = find_or_create_worker worker_class: task_worker_class

    log_ini task, "on worker #{worker}"
    worker.send_request_to_child(task)
    tasks.running << task

    ui.force_clear
  end
end

#done_countersObject



97
98
99
# File 'lib/fasten/runner.rb', line 97

def done_counters
  "#{tasks.done.count}/#{tasks.count}"
end

#find_or_create_worker(worker_class:) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/fasten/runner.rb', line 235

def find_or_create_worker(worker_class:)
  worker = workers.find { |item| item.instance_of?(worker_class) && item.running_task.nil? }

  unless worker
    @worker_id = (@worker_id || 0) + 1
    worker = worker_class.new runner: self, name: "#{worker_class.to_s.gsub('::', '-')}-#{format '%02X', @worker_id}", use_threads: use_threads
    worker.start
    workers << worker

    log_info "Worker created: #{worker}"

    ui.force_clear
  end

  worker
end

#kindObject



276
277
278
# File 'lib/fasten/runner.rb', line 276

def kind
  'runner'
end

#map(list, &block) ⇒ Object



87
88
89
90
91
92
93
94
95
# File 'lib/fasten/runner.rb', line 87

def map(list, &block)
  list.each do |item|
    task item.to_s, request: item, &block
  end

  perform

  tasks.map(&:response)
end

#performObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fasten/runner.rb', line 54

def perform
  initialize_logger
  StdThreadProxy.install if use_threads
  self.state = :RUNNING
  log_ini self, running_counters
  load_stats
  touch_task_logs

  run_ui do
    perform_loop
  end

  self.state = tasks.map(&:state).all?(:DONE) ? :DONE : :FAIL
  log_fin self, running_counters

  stats_add_entry(state, self)

  stats_summary if summary
ensure
  StdThreadProxy.uninstall if use_threads
  close_logger
  save_stats
end

#perform_loopObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/fasten/runner.rb', line 105

def perform_loop
  loop do
    wait_for_running_tasks
    raise_error_in_failure
    remove_workers_as_needed
    if i[PAUSING PAUSED QUITTING].include?(state)
      check_state
    else
      dispatch_pending_tasks
    end

    break if (tasks.no_running? && tasks.no_waiting?) || state == :QUIT
  end

  remove_all_workers
end

#raise_error_in_failureObject



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/fasten/runner.rb', line 201

def raise_error_in_failure
  return unless tasks.failed?

  show_error_tasks

  message = "Stopping because the following tasks failed: #{tasks.failed.map(&:to_s).join(', ')}"

  if developer
    ui.cleanup
    puts message

    puts 'Entering development console'

    Kernel.binding.pry # rubocop:disable Lint/Debugger
  else
    remove_all_workers

    raise message
  end
end

#receive_jobs_tasks_fork(reads) ⇒ Object



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/fasten/runner.rb', line 179

def receive_jobs_tasks_fork(reads)
  reads&.each do |read|
    next unless (worker = workers.find { |item| item.parent_read == read })

    task = worker.receive_response_from_child

    tasks.running.delete task

    tasks.update task
    stats_add_entry(task.state, task)

    log_fin task, done_counters
    ui.force_clear
  end
end

#receive_jobs_tasks_thread(items) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/fasten/runner.rb', line 153

def receive_jobs_tasks_thread(items)
  items&.each do |task|
    tasks.running.delete task

    task.worker.running_task = task.worker.state = nil

    tasks.update task
    stats_add_entry(task.state, task)

    log_fin task, done_counters
    ui.force_clear
  end
end

#reconfigure(**options) ⇒ Object



34
35
36
37
38
39
40
# File 'lib/fasten/runner.rb', line 34

def reconfigure(**options)
  i[name stats summary jobs worker_class fasten_dir use_threads ui_mode developer priority].each do |key|
    send "#{key}=", options[key] if options.key? key
  end

  initialize_stats
end

#register(&block) ⇒ Object



50
51
52
# File 'lib/fasten/runner.rb', line 50

def register(&block)
  instance_eval(&block)
end

#remove_all_workersObject



269
270
271
272
273
274
# File 'lib/fasten/runner.rb', line 269

def remove_all_workers
  workers.each(&:kill)
  workers.clear

  ui.force_clear
end

#remove_workers_as_neededObject



222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/fasten/runner.rb', line 222

def remove_workers_as_needed
  workers.group_by(&:class).each do |_clazz, worker_list|
    while worker_list.count > jobs
      break unless (worker = workers.find { |item| item.running_task.nil? })

      worker.kill
      workers.delete worker

      ui.force_clear
    end
  end
end

#running_countersObject



101
102
103
# File 'lib/fasten/runner.rb', line 101

def running_counters
  "#{tasks.done.count + tasks.running.count}/#{tasks.count}"
end

#should_wait_for_running_tasks?Boolean

Returns:

  • (Boolean)


133
134
135
# File 'lib/fasten/runner.rb', line 133

def should_wait_for_running_tasks?
  (tasks.running? && (tasks.no_waiting? || tasks.failed? || i[PAUSING QUITTING].include?(state))) || tasks.running.map(&:weight).sum >= jobs
end

#show_error_tasksObject



195
196
197
198
199
# File 'lib/fasten/runner.rb', line 195

def show_error_tasks
  tasks.failed.each do |task|
    log_info "task: #{task} error:#{task.error}\n#{task.error&.backtrace&.join("\n")}"
  end
end

#task(name, **opts, &block) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/fasten/runner.rb', line 42

def task(name, **opts, &block)
  opts[:name] = name
  opts[:block] = block
  tasks << task = Task.new(**opts)

  task
end

#to_sObject



280
281
282
# File 'lib/fasten/runner.rb', line 280

def to_s
  name
end

#touch_task_logsObject



78
79
80
81
82
83
84
85
# File 'lib/fasten/runner.rb', line 78

def touch_task_logs
  FileUtils.mkdir_p "#{fasten_dir}/log/task/"
  tasks.each do |task|
    path = "#{fasten_dir}/log/task/#{task.name}.log"
    puts "Fasten: creating log #{path}"
    FileUtils.touch path
  end
end

#wait_for_running_tasksObject



137
138
139
# File 'lib/fasten/runner.rb', line 137

def wait_for_running_tasks
  use_threads ? wait_for_running_tasks_thread : wait_for_running_tasks_fork
end

#wait_for_running_tasks_forkObject



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/fasten/runner.rb', line 167

def wait_for_running_tasks_fork
  while should_wait_for_running_tasks?
    ui.update
    reads = workers.map(&:parent_read)
    reads, _writes, _errors = IO.select(reads, [], [], 0.5)

    receive_jobs_tasks_fork(reads)
  end

  ui.update
end

#wait_for_running_tasks_threadObject



141
142
143
144
145
146
147
148
149
150
151
# File 'lib/fasten/runner.rb', line 141

def wait_for_running_tasks_thread
  self.queue ||= TimeoutQueue.new

  while should_wait_for_running_tasks?
    ui.update

    receive_jobs_tasks_thread queue.receive_with_timeout(0.5)
  end

  ui.update
end