Class: Fasten::Runner
Overview
rubocop:disable Metrics/ClassLength
Constant Summary
Support::Stats::FLOAT_FORMATTER
Instance Attribute Summary collapse
#dif, #error, #fin, #ini, #runner, #state
#log_file, #logger
Instance Method Summary
collapse
#load_yaml, #save_yaml
#run_ui, #ui
#hformat, #initialize_stats, #load_stats, #save_stats, #split_time, #stats_add_entry, #stats_create_entry, #stats_data, #stats_entries, #stats_last, #stats_run_history, #stats_summary, #stats_summary_data, #update_stats
#deps, #deps_str, #deps_str_runner, #deps_str_task, #idle?, #last_avg, #last_err, #last_stat, #paused?, #pausing?, #quitting?, #running?
#close_logger, #initialize_logger, #log_fin, #log_ini
Constructor Details
#initialize(**options) ⇒ Runner
Returns a new instance of Runner.
23
24
25
26
27
28
29
30
31
32
|
# File 'lib/fasten/runner.rb', line 23
def initialize(**options)
i[name stats summary jobs worker_class fasten_dir use_threads ui_mode developer priority].each do |key|
options[key] = Fasten.send "default_#{key}" unless options.key? key
end
@tasks = TaskManager.new(targets: options[:targets] || [], runner: self)
@workers = []
reconfigure(**options)
end
|
Instance Attribute Details
#developer ⇒ Object
Returns the value of attribute developer.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def developer
@developer
end
|
#fasten_dir ⇒ Object
Returns the value of attribute fasten_dir.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def fasten_dir
@fasten_dir
end
|
#jobs ⇒ Object
Returns the value of attribute jobs.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def jobs
@jobs
end
|
#name ⇒ Object
Returns the value of attribute name.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def name
@name
end
|
#priority ⇒ Object
Returns the value of attribute priority.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def priority
@priority
end
|
#queue ⇒ Object
Returns the value of attribute queue.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def queue
@queue
end
|
#stats ⇒ Object
Returns the value of attribute stats.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def stats
@stats
end
|
#summary ⇒ Object
Returns the value of attribute summary.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def summary
@summary
end
|
#tasks ⇒ Object
Returns the value of attribute tasks.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def tasks
@tasks
end
|
#ui_mode ⇒ Object
Returns the value of attribute ui_mode.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def ui_mode
@ui_mode
end
|
#use_threads ⇒ Object
Returns the value of attribute use_threads.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def use_threads
@use_threads
end
|
#worker_class ⇒ Object
Returns the value of attribute worker_class.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def worker_class
@worker_class
end
|
#workers ⇒ Object
Returns the value of attribute workers.
21
22
23
|
# File 'lib/fasten/runner.rb', line 21
def workers
@workers
end
|
Instance Method Details
#check_state ⇒ Object
122
123
124
125
126
127
128
129
130
131
|
# File 'lib/fasten/runner.rb', line 122
def check_state
if state == :PAUSING && tasks.no_running?
self.state = :PAUSED
ui.message = nil
ui.force_clear
elsif state == :QUITTING && tasks.no_running?
self.state = :QUIT
ui.force_clear
end
end
|
#dispatch_pending_tasks ⇒ Object
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
|
# File 'lib/fasten/runner.rb', line 252
def dispatch_pending_tasks
while tasks.waiting? && tasks.running.map(&:weight).sum < jobs
task = tasks.next
task_worker_class = task.worker_class || worker_class
task_worker_class = Object.const_get(task_worker_class) if task_worker_class.is_a? String
worker = find_or_create_worker worker_class: task_worker_class
log_ini task, "on worker #{worker}"
worker.send_request_to_child(task)
tasks.running << task
ui.force_clear
end
end
|
#done_counters ⇒ Object
97
98
99
|
# File 'lib/fasten/runner.rb', line 97
def done_counters
"#{tasks.done.count}/#{tasks.count}"
end
|
#find_or_create_worker(worker_class:) ⇒ Object
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
# File 'lib/fasten/runner.rb', line 235
def find_or_create_worker(worker_class:)
worker = workers.find { |item| item.instance_of?(worker_class) && item.running_task.nil? }
unless worker
@worker_id = (@worker_id || 0) + 1
worker = worker_class.new runner: self, name: "#{worker_class.to_s.gsub('::', '-')}-#{format '%02X', @worker_id}", use_threads: use_threads
worker.start
workers << worker
log_info "Worker created: #{worker}"
ui.force_clear
end
worker
end
|
#kind ⇒ Object
276
277
278
|
# File 'lib/fasten/runner.rb', line 276
def kind
'runner'
end
|
#map(list, &block) ⇒ Object
87
88
89
90
91
92
93
94
95
|
# File 'lib/fasten/runner.rb', line 87
def map(list, &block)
list.each do |item|
task item.to_s, request: item, &block
end
perform
tasks.map(&:response)
end
|
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/fasten/runner.rb', line 54
def perform
initialize_logger
StdThreadProxy.install if use_threads
self.state = :RUNNING
log_ini self, running_counters
load_stats
touch_task_logs
run_ui do
perform_loop
end
self.state = tasks.map(&:state).all?(:DONE) ? :DONE : :FAIL
log_fin self, running_counters
stats_add_entry(state, self)
stats_summary if summary
ensure
StdThreadProxy.uninstall if use_threads
close_logger
save_stats
end
|
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# File 'lib/fasten/runner.rb', line 105
def perform_loop
loop do
wait_for_running_tasks
raise_error_in_failure
remove_workers_as_needed
if i[PAUSING PAUSED QUITTING].include?(state)
check_state
else
dispatch_pending_tasks
end
break if (tasks.no_running? && tasks.no_waiting?) || state == :QUIT
end
remove_all_workers
end
|
#raise_error_in_failure ⇒ Object
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
# File 'lib/fasten/runner.rb', line 201
def raise_error_in_failure
return unless tasks.failed?
show_error_tasks
message = "Stopping because the following tasks failed: #{tasks.failed.map(&:to_s).join(', ')}"
if developer
ui.cleanup
puts message
puts 'Entering development console'
Kernel.binding.pry
else
remove_all_workers
raise message
end
end
|
#receive_jobs_tasks_fork(reads) ⇒ Object
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
# File 'lib/fasten/runner.rb', line 179
def receive_jobs_tasks_fork(reads)
reads&.each do |read|
next unless (worker = workers.find { |item| item.parent_read == read })
task = worker.receive_response_from_child
tasks.running.delete task
tasks.update task
stats_add_entry(task.state, task)
log_fin task, done_counters
ui.force_clear
end
end
|
#receive_jobs_tasks_thread(items) ⇒ Object
153
154
155
156
157
158
159
160
161
162
163
164
165
|
# File 'lib/fasten/runner.rb', line 153
def receive_jobs_tasks_thread(items)
items&.each do |task|
tasks.running.delete task
task.worker.running_task = task.worker.state = nil
tasks.update task
stats_add_entry(task.state, task)
log_fin task, done_counters
ui.force_clear
end
end
|
34
35
36
37
38
39
40
|
# File 'lib/fasten/runner.rb', line 34
def reconfigure(**options)
i[name stats summary jobs worker_class fasten_dir use_threads ui_mode developer priority].each do |key|
send "#{key}=", options[key] if options.key? key
end
initialize_stats
end
|
#register(&block) ⇒ Object
50
51
52
|
# File 'lib/fasten/runner.rb', line 50
def register(&block)
instance_eval(&block)
end
|
#remove_all_workers ⇒ Object
269
270
271
272
273
274
|
# File 'lib/fasten/runner.rb', line 269
def remove_all_workers
workers.each(&:kill)
workers.clear
ui.force_clear
end
|
#remove_workers_as_needed ⇒ Object
222
223
224
225
226
227
228
229
230
231
232
233
|
# File 'lib/fasten/runner.rb', line 222
def remove_workers_as_needed
workers.group_by(&:class).each do |_clazz, worker_list|
while worker_list.count > jobs
break unless (worker = workers.find { |item| item.running_task.nil? })
worker.kill
workers.delete worker
ui.force_clear
end
end
end
|
#running_counters ⇒ Object
101
102
103
|
# File 'lib/fasten/runner.rb', line 101
def running_counters
"#{tasks.done.count + tasks.running.count}/#{tasks.count}"
end
|
#should_wait_for_running_tasks? ⇒ Boolean
133
134
135
|
# File 'lib/fasten/runner.rb', line 133
def should_wait_for_running_tasks?
(tasks.running? && (tasks.no_waiting? || tasks.failed? || i[PAUSING QUITTING].include?(state))) || tasks.running.map(&:weight).sum >= jobs
end
|
#show_error_tasks ⇒ Object
195
196
197
198
199
|
# File 'lib/fasten/runner.rb', line 195
def show_error_tasks
tasks.failed.each do |task|
log_info "task: #{task} error:#{task.error}\n#{task.error&.backtrace&.join("\n")}"
end
end
|
#task(name, **opts, &block) ⇒ Object
42
43
44
45
46
47
48
|
# File 'lib/fasten/runner.rb', line 42
def task(name, **opts, &block)
opts[:name] = name
opts[:block] = block
tasks << task = Task.new(**opts)
task
end
|
#to_s ⇒ Object
280
281
282
|
# File 'lib/fasten/runner.rb', line 280
def to_s
name
end
|
#touch_task_logs ⇒ Object
78
79
80
81
82
83
84
85
|
# File 'lib/fasten/runner.rb', line 78
def touch_task_logs
FileUtils.mkdir_p "#{fasten_dir}/log/task/"
tasks.each do |task|
path = "#{fasten_dir}/log/task/#{task.name}.log"
puts "Fasten: creating log #{path}"
FileUtils.touch path
end
end
|
#wait_for_running_tasks ⇒ Object
137
138
139
|
# File 'lib/fasten/runner.rb', line 137
def wait_for_running_tasks
use_threads ? wait_for_running_tasks_thread : wait_for_running_tasks_fork
end
|
#wait_for_running_tasks_fork ⇒ Object
167
168
169
170
171
172
173
174
175
176
177
|
# File 'lib/fasten/runner.rb', line 167
def wait_for_running_tasks_fork
while should_wait_for_running_tasks?
ui.update
reads = workers.map(&:parent_read)
reads, _writes, _errors = IO.select(reads, [], [], 0.5)
receive_jobs_tasks_fork(reads)
end
ui.update
end
|
#wait_for_running_tasks_thread ⇒ Object
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/fasten/runner.rb', line 141
def wait_for_running_tasks_thread
self.queue ||= TimeoutQueue.new
while should_wait_for_running_tasks?
ui.update
receive_jobs_tasks_thread queue.receive_with_timeout(0.5)
end
ui.update
end
|