Class: TaxGenerator::Processor

Inherits:
Object
  • Object
show all
Includes:
Concurrent::Async, ApplicationHelper
Defined in:
lib/tax_generator/classes/processor.rb

Overview

class used to process xml files and create html files

Constant Summary collapse

QUEUES =

:nodoc:

Concurrent::Map.new do |hash, queue_name| #:nodoc:
  hash.compute_if_absent(queue_name) {
    Concurrent::ThreadPoolExecutor.new(
      min_threads: 10, # create 10 threads at startup
      max_threads: 50, # create at most 50 threads
      max_queue: 0, # unbounded queue of work waiting for an available thread
    )
  }
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ApplicationHelper

create_directories, elements_with_content, erb_template, execute_with_rescue, format_error, log_error, log_message, nokogiri_xml, rescue_interrupt, root

Constructor Details

#initialize(options = {}) ⇒ void

receives a list of options that are used to determine the input files and output and input folders

Parameters:

  • options (Hash) (defaults to: {})

    the options that can determine the input and output files and folders

Options Hash (options):

  • :input_dir (String)

    The input directory

  • :output_dir (String)

    The output directory

  • :taxonomy_file_name (String)

    The taxonomy file name

  • :destinations_file_name (String)

    The destinations file name

See Also:



51
52
53
54
55
56
57
58
59
# File 'lib/tax_generator/classes/processor.rb', line 51

def initialize(options = {})
  @options = options.is_a?(Hash) ? options.symbolize_keys : {}
  @jobs_mutex = Mutex.new
  @job_to_worker_mutex = Mutex.new
  @worker_to_job_mutex = Mutex.new
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
end

Instance Attribute Details

#job_to_workerHash

Returns each key from the list is the job id, and the value is the worker that will handle the job.

Returns:

  • (Hash)

    each key from the list is the job id, and the value is the worker that will handle the job



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
include Concurrent::Async
include TaxGenerator::ApplicationHelper

QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
  hash.compute_if_absent(queue_name) {
    Concurrent::ThreadPoolExecutor.new(
      min_threads: 10, # create 10 threads at startup
      max_threads: 50, # create at most 50 threads
      max_queue: 0, # unbounded queue of work waiting for an available thread
    )
  }
end

attr_reader :options, :taxonomy, :jobs, :job_to_worker, :worker_to_job

#trap_exit :worker_died

#  receives a list of options that are used to determine the input files and output and input folders
#
# @param  [Hash]  options the options that can determine the input and output files and folders
# @option options [String] :input_dir The input directory
# @option options [String]:output_dir The output directory
# @option options [String] :taxonomy_file_name The taxonomy file name
# @option options [String] :destinations_file_name The destinations file name
#
# @see #work
#
# @return [void]
#
# @api public
def initialize(options = {})
  @options = options.is_a?(Hash) ? options.symbolize_keys : {}
  @jobs_mutex = Mutex.new
  @job_to_worker_mutex = Mutex.new
  @worker_to_job_mutex = Mutex.new
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
end

def enqueue(job_data, queue: 'default') #:nodoc:
  QUEUES[queue].post(job_data) { |job|
    TaxGenerator::FileCreator.spawn(name: "worker_#{job['atlas_id']}", args: [job, self])
  }
  end

  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  #  returns the input folder from the options list
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs_mutex.synchronize do
      @jobs.all? { |_job_id, job| job['status'] == 'finished' }
    end
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.each do |job|
      job = job.stringify_keys
      @jobs_mutex.synchronize do
        @jobs[job['atlas_id']] = job
      end
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    @jobs_mutex.synchronize do
      @jobs.each do |_job_id, job|
        enqueue(job)
      end
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = []
    @taxonomy.taxonomies.each do |taxonomy_node|
      jobs << { atlas_id: taxonomy_node.name, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }
    end
    nokogiri_xml(destinations_file_path).xpath('//destination').each do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
  end

  # registeres a worker inside the job_to_worker storage using the 'atlas_id' key from the job hash
  # @param  [TaxGenerator::FileCreator]  worker the worker that needs to be registered
  # @param  [Hash]  job the job that will be used to register the worker
  #
  # @return [void]
  #
  # @api public
  def register_job_to_worker(job, worker)
    @job_to_worker_mutex.synchronize do
      @job_to_worker[job['atlas_id']] = worker
    end
  end

  # registeres a job to a worker, using the mailbox address of the worker (which is unique)
  # @param  [TaxGenerator::FileCreator]  worker the worker that will be used to registerr the job
  # @param  [Hash]  job the job that willbe registered to a worker
  #
  # @return [void]
  #
  # @api public
  def register_worker_to_job(job, worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name] = job
    end
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  # @see #register_job_to_worker
  # @see #register_worker_to_job
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    register_job_to_worker(job, worker)
    register_worker_to_job(job, worker)
    log_message("worker #{worker.job_id} registed into manager")
    worker.tell(:start_work)
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # fetches the job from a worker
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def get_job_from_worker(worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name]
    end
  end

  # deletes the worker from the worker_to_job storage
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def delete_from_worker_to_job(worker)
    name = worker.name
    @worker_to_job_mutex.synchronize do
      @worker_to_job.delete(name)
    end
    name
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    job = get_job_from_worker(worker)
    worker_name = delete_from_worker_to_job(worker)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with name #{worker_name.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#jobsHash

Returns each key from the job list is the job id, and the value is the job itself.

Returns:

  • (Hash)

    each key from the job list is the job id, and the value is the job itself



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
include Concurrent::Async
include TaxGenerator::ApplicationHelper

QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
  hash.compute_if_absent(queue_name) {
    Concurrent::ThreadPoolExecutor.new(
      min_threads: 10, # create 10 threads at startup
      max_threads: 50, # create at most 50 threads
      max_queue: 0, # unbounded queue of work waiting for an available thread
    )
  }
end

attr_reader :options, :taxonomy, :jobs, :job_to_worker, :worker_to_job

#trap_exit :worker_died

#  receives a list of options that are used to determine the input files and output and input folders
#
# @param  [Hash]  options the options that can determine the input and output files and folders
# @option options [String] :input_dir The input directory
# @option options [String]:output_dir The output directory
# @option options [String] :taxonomy_file_name The taxonomy file name
# @option options [String] :destinations_file_name The destinations file name
#
# @see #work
#
# @return [void]
#
# @api public
def initialize(options = {})
  @options = options.is_a?(Hash) ? options.symbolize_keys : {}
  @jobs_mutex = Mutex.new
  @job_to_worker_mutex = Mutex.new
  @worker_to_job_mutex = Mutex.new
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
end

def enqueue(job_data, queue: 'default') #:nodoc:
  QUEUES[queue].post(job_data) { |job|
    TaxGenerator::FileCreator.spawn(name: "worker_#{job['atlas_id']}", args: [job, self])
  }
  end

  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  #  returns the input folder from the options list
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs_mutex.synchronize do
      @jobs.all? { |_job_id, job| job['status'] == 'finished' }
    end
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.each do |job|
      job = job.stringify_keys
      @jobs_mutex.synchronize do
        @jobs[job['atlas_id']] = job
      end
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    @jobs_mutex.synchronize do
      @jobs.each do |_job_id, job|
        enqueue(job)
      end
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = []
    @taxonomy.taxonomies.each do |taxonomy_node|
      jobs << { atlas_id: taxonomy_node.name, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }
    end
    nokogiri_xml(destinations_file_path).xpath('//destination').each do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
  end

  # registeres a worker inside the job_to_worker storage using the 'atlas_id' key from the job hash
  # @param  [TaxGenerator::FileCreator]  worker the worker that needs to be registered
  # @param  [Hash]  job the job that will be used to register the worker
  #
  # @return [void]
  #
  # @api public
  def register_job_to_worker(job, worker)
    @job_to_worker_mutex.synchronize do
      @job_to_worker[job['atlas_id']] = worker
    end
  end

  # registeres a job to a worker, using the mailbox address of the worker (which is unique)
  # @param  [TaxGenerator::FileCreator]  worker the worker that will be used to registerr the job
  # @param  [Hash]  job the job that willbe registered to a worker
  #
  # @return [void]
  #
  # @api public
  def register_worker_to_job(job, worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name] = job
    end
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  # @see #register_job_to_worker
  # @see #register_worker_to_job
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    register_job_to_worker(job, worker)
    register_worker_to_job(job, worker)
    log_message("worker #{worker.job_id} registed into manager")
    worker.tell(:start_work)
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # fetches the job from a worker
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def get_job_from_worker(worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name]
    end
  end

  # deletes the worker from the worker_to_job storage
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def delete_from_worker_to_job(worker)
    name = worker.name
    @worker_to_job_mutex.synchronize do
      @worker_to_job.delete(name)
    end
    name
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    job = get_job_from_worker(worker)
    worker_name = delete_from_worker_to_job(worker)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with name #{worker_name.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#optionsHash

Returns the options that can determine the input and output files and folders.

Returns:

  • (Hash)

    the options that can determine the input and output files and folders



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
include Concurrent::Async
include TaxGenerator::ApplicationHelper

QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
  hash.compute_if_absent(queue_name) {
    Concurrent::ThreadPoolExecutor.new(
      min_threads: 10, # create 10 threads at startup
      max_threads: 50, # create at most 50 threads
      max_queue: 0, # unbounded queue of work waiting for an available thread
    )
  }
end

attr_reader :options, :taxonomy, :jobs, :job_to_worker, :worker_to_job

#trap_exit :worker_died

#  receives a list of options that are used to determine the input files and output and input folders
#
# @param  [Hash]  options the options that can determine the input and output files and folders
# @option options [String] :input_dir The input directory
# @option options [String]:output_dir The output directory
# @option options [String] :taxonomy_file_name The taxonomy file name
# @option options [String] :destinations_file_name The destinations file name
#
# @see #work
#
# @return [void]
#
# @api public
def initialize(options = {})
  @options = options.is_a?(Hash) ? options.symbolize_keys : {}
  @jobs_mutex = Mutex.new
  @job_to_worker_mutex = Mutex.new
  @worker_to_job_mutex = Mutex.new
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
end

def enqueue(job_data, queue: 'default') #:nodoc:
  QUEUES[queue].post(job_data) { |job|
    TaxGenerator::FileCreator.spawn(name: "worker_#{job['atlas_id']}", args: [job, self])
  }
  end

  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  #  returns the input folder from the options list
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs_mutex.synchronize do
      @jobs.all? { |_job_id, job| job['status'] == 'finished' }
    end
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.each do |job|
      job = job.stringify_keys
      @jobs_mutex.synchronize do
        @jobs[job['atlas_id']] = job
      end
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    @jobs_mutex.synchronize do
      @jobs.each do |_job_id, job|
        enqueue(job)
      end
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = []
    @taxonomy.taxonomies.each do |taxonomy_node|
      jobs << { atlas_id: taxonomy_node.name, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }
    end
    nokogiri_xml(destinations_file_path).xpath('//destination').each do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
  end

  # registeres a worker inside the job_to_worker storage using the 'atlas_id' key from the job hash
  # @param  [TaxGenerator::FileCreator]  worker the worker that needs to be registered
  # @param  [Hash]  job the job that will be used to register the worker
  #
  # @return [void]
  #
  # @api public
  def register_job_to_worker(job, worker)
    @job_to_worker_mutex.synchronize do
      @job_to_worker[job['atlas_id']] = worker
    end
  end

  # registeres a job to a worker, using the mailbox address of the worker (which is unique)
  # @param  [TaxGenerator::FileCreator]  worker the worker that will be used to registerr the job
  # @param  [Hash]  job the job that willbe registered to a worker
  #
  # @return [void]
  #
  # @api public
  def register_worker_to_job(job, worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name] = job
    end
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  # @see #register_job_to_worker
  # @see #register_worker_to_job
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    register_job_to_worker(job, worker)
    register_worker_to_job(job, worker)
    log_message("worker #{worker.job_id} registed into manager")
    worker.tell(:start_work)
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # fetches the job from a worker
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def get_job_from_worker(worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name]
    end
  end

  # deletes the worker from the worker_to_job storage
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def delete_from_worker_to_job(worker)
    name = worker.name
    @worker_to_job_mutex.synchronize do
      @worker_to_job.delete(name)
    end
    name
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    job = get_job_from_worker(worker)
    worker_name = delete_from_worker_to_job(worker)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with name #{worker_name.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#taxonomyTaxGenerator::TaxonomyTree

Returns the taxonomy tree that holds the nodes from the taxonomy xml document.

Returns:



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
include Concurrent::Async
include TaxGenerator::ApplicationHelper

QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
  hash.compute_if_absent(queue_name) {
    Concurrent::ThreadPoolExecutor.new(
      min_threads: 10, # create 10 threads at startup
      max_threads: 50, # create at most 50 threads
      max_queue: 0, # unbounded queue of work waiting for an available thread
    )
  }
end

attr_reader :options, :taxonomy, :jobs, :job_to_worker, :worker_to_job

#trap_exit :worker_died

#  receives a list of options that are used to determine the input files and output and input folders
#
# @param  [Hash]  options the options that can determine the input and output files and folders
# @option options [String] :input_dir The input directory
# @option options [String]:output_dir The output directory
# @option options [String] :taxonomy_file_name The taxonomy file name
# @option options [String] :destinations_file_name The destinations file name
#
# @see #work
#
# @return [void]
#
# @api public
def initialize(options = {})
  @options = options.is_a?(Hash) ? options.symbolize_keys : {}
  @jobs_mutex = Mutex.new
  @job_to_worker_mutex = Mutex.new
  @worker_to_job_mutex = Mutex.new
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
end

def enqueue(job_data, queue: 'default') #:nodoc:
  QUEUES[queue].post(job_data) { |job|
    TaxGenerator::FileCreator.spawn(name: "worker_#{job['atlas_id']}", args: [job, self])
  }
  end

  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  #  returns the input folder from the options list
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs_mutex.synchronize do
      @jobs.all? { |_job_id, job| job['status'] == 'finished' }
    end
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.each do |job|
      job = job.stringify_keys
      @jobs_mutex.synchronize do
        @jobs[job['atlas_id']] = job
      end
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    @jobs_mutex.synchronize do
      @jobs.each do |_job_id, job|
        enqueue(job)
      end
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = []
    @taxonomy.taxonomies.each do |taxonomy_node|
      jobs << { atlas_id: taxonomy_node.name, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }
    end
    nokogiri_xml(destinations_file_path).xpath('//destination').each do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
  end

  # registeres a worker inside the job_to_worker storage using the 'atlas_id' key from the job hash
  # @param  [TaxGenerator::FileCreator]  worker the worker that needs to be registered
  # @param  [Hash]  job the job that will be used to register the worker
  #
  # @return [void]
  #
  # @api public
  def register_job_to_worker(job, worker)
    @job_to_worker_mutex.synchronize do
      @job_to_worker[job['atlas_id']] = worker
    end
  end

  # registeres a job to a worker, using the mailbox address of the worker (which is unique)
  # @param  [TaxGenerator::FileCreator]  worker the worker that will be used to registerr the job
  # @param  [Hash]  job the job that willbe registered to a worker
  #
  # @return [void]
  #
  # @api public
  def register_worker_to_job(job, worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name] = job
    end
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  # @see #register_job_to_worker
  # @see #register_worker_to_job
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    register_job_to_worker(job, worker)
    register_worker_to_job(job, worker)
    log_message("worker #{worker.job_id} registed into manager")
    worker.tell(:start_work)
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # fetches the job from a worker
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def get_job_from_worker(worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name]
    end
  end

  # deletes the worker from the worker_to_job storage
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def delete_from_worker_to_job(worker)
    name = worker.name
    @worker_to_job_mutex.synchronize do
      @worker_to_job.delete(name)
    end
    name
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    job = get_job_from_worker(worker)
    worker_name = delete_from_worker_to_job(worker)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with name #{worker_name.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#worker_supervisorCelluloid::SupervisionGroup

Returns the supervision group that supervises workers.

Returns:

  • (Celluloid::SupervisionGroup)

    the supervision group that supervises workers



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
include Concurrent::Async
include TaxGenerator::ApplicationHelper

QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
  hash.compute_if_absent(queue_name) {
    Concurrent::ThreadPoolExecutor.new(
      min_threads: 10, # create 10 threads at startup
      max_threads: 50, # create at most 50 threads
      max_queue: 0, # unbounded queue of work waiting for an available thread
    )
  }
end

attr_reader :options, :taxonomy, :jobs, :job_to_worker, :worker_to_job

#trap_exit :worker_died

#  receives a list of options that are used to determine the input files and output and input folders
#
# @param  [Hash]  options the options that can determine the input and output files and folders
# @option options [String] :input_dir The input directory
# @option options [String]:output_dir The output directory
# @option options [String] :taxonomy_file_name The taxonomy file name
# @option options [String] :destinations_file_name The destinations file name
#
# @see #work
#
# @return [void]
#
# @api public
def initialize(options = {})
  @options = options.is_a?(Hash) ? options.symbolize_keys : {}
  @jobs_mutex = Mutex.new
  @job_to_worker_mutex = Mutex.new
  @worker_to_job_mutex = Mutex.new
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
end

def enqueue(job_data, queue: 'default') #:nodoc:
  QUEUES[queue].post(job_data) { |job|
    TaxGenerator::FileCreator.spawn(name: "worker_#{job['atlas_id']}", args: [job, self])
  }
  end

  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  #  returns the input folder from the options list
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs_mutex.synchronize do
      @jobs.all? { |_job_id, job| job['status'] == 'finished' }
    end
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.each do |job|
      job = job.stringify_keys
      @jobs_mutex.synchronize do
        @jobs[job['atlas_id']] = job
      end
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    @jobs_mutex.synchronize do
      @jobs.each do |_job_id, job|
        enqueue(job)
      end
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = []
    @taxonomy.taxonomies.each do |taxonomy_node|
      jobs << { atlas_id: taxonomy_node.name, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }
    end
    nokogiri_xml(destinations_file_path).xpath('//destination').each do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
  end

  # registeres a worker inside the job_to_worker storage using the 'atlas_id' key from the job hash
  # @param  [TaxGenerator::FileCreator]  worker the worker that needs to be registered
  # @param  [Hash]  job the job that will be used to register the worker
  #
  # @return [void]
  #
  # @api public
  def register_job_to_worker(job, worker)
    @job_to_worker_mutex.synchronize do
      @job_to_worker[job['atlas_id']] = worker
    end
  end

  # registeres a job to a worker, using the mailbox address of the worker (which is unique)
  # @param  [TaxGenerator::FileCreator]  worker the worker that will be used to registerr the job
  # @param  [Hash]  job the job that willbe registered to a worker
  #
  # @return [void]
  #
  # @api public
  def register_worker_to_job(job, worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name] = job
    end
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  # @see #register_job_to_worker
  # @see #register_worker_to_job
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    register_job_to_worker(job, worker)
    register_worker_to_job(job, worker)
    log_message("worker #{worker.job_id} registed into manager")
    worker.tell(:start_work)
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # fetches the job from a worker
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def get_job_from_worker(worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name]
    end
  end

  # deletes the worker from the worker_to_job storage
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def delete_from_worker_to_job(worker)
    name = worker.name
    @worker_to_job_mutex.synchronize do
      @worker_to_job.delete(name)
    end
    name
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    job = get_job_from_worker(worker)
    worker_name = delete_from_worker_to_job(worker)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with name #{worker_name.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#worker_to_jobHash

Returns each key from the list is the workers mailbox address, and the value is the job being handled by the worker.

Returns:

  • (Hash)

    each key from the list is the workers mailbox address, and the value is the job being handled by the worker



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
include Concurrent::Async
include TaxGenerator::ApplicationHelper

QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
  hash.compute_if_absent(queue_name) {
    Concurrent::ThreadPoolExecutor.new(
      min_threads: 10, # create 10 threads at startup
      max_threads: 50, # create at most 50 threads
      max_queue: 0, # unbounded queue of work waiting for an available thread
    )
  }
end

attr_reader :options, :taxonomy, :jobs, :job_to_worker, :worker_to_job

#trap_exit :worker_died

#  receives a list of options that are used to determine the input files and output and input folders
#
# @param  [Hash]  options the options that can determine the input and output files and folders
# @option options [String] :input_dir The input directory
# @option options [String]:output_dir The output directory
# @option options [String] :taxonomy_file_name The taxonomy file name
# @option options [String] :destinations_file_name The destinations file name
#
# @see #work
#
# @return [void]
#
# @api public
def initialize(options = {})
  @options = options.is_a?(Hash) ? options.symbolize_keys : {}
  @jobs_mutex = Mutex.new
  @job_to_worker_mutex = Mutex.new
  @worker_to_job_mutex = Mutex.new
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
end

def enqueue(job_data, queue: 'default') #:nodoc:
  QUEUES[queue].post(job_data) { |job|
    TaxGenerator::FileCreator.spawn(name: "worker_#{job['atlas_id']}", args: [job, self])
  }
  end

  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  #  returns the input folder from the options list
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs_mutex.synchronize do
      @jobs.all? { |_job_id, job| job['status'] == 'finished' }
    end
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.each do |job|
      job = job.stringify_keys
      @jobs_mutex.synchronize do
        @jobs[job['atlas_id']] = job
      end
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    @jobs_mutex.synchronize do
      @jobs.each do |_job_id, job|
        enqueue(job)
      end
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = []
    @taxonomy.taxonomies.each do |taxonomy_node|
      jobs << { atlas_id: taxonomy_node.name, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }
    end
    nokogiri_xml(destinations_file_path).xpath('//destination').each do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
  end

  # registeres a worker inside the job_to_worker storage using the 'atlas_id' key from the job hash
  # @param  [TaxGenerator::FileCreator]  worker the worker that needs to be registered
  # @param  [Hash]  job the job that will be used to register the worker
  #
  # @return [void]
  #
  # @api public
  def register_job_to_worker(job, worker)
    @job_to_worker_mutex.synchronize do
      @job_to_worker[job['atlas_id']] = worker
    end
  end

  # registeres a job to a worker, using the mailbox address of the worker (which is unique)
  # @param  [TaxGenerator::FileCreator]  worker the worker that will be used to registerr the job
  # @param  [Hash]  job the job that willbe registered to a worker
  #
  # @return [void]
  #
  # @api public
  def register_worker_to_job(job, worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name] = job
    end
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  # @see #register_job_to_worker
  # @see #register_worker_to_job
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    register_job_to_worker(job, worker)
    register_worker_to_job(job, worker)
    log_message("worker #{worker.job_id} registed into manager")
    worker.tell(:start_work)
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # fetches the job from a worker
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def get_job_from_worker(worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name]
    end
  end

  # deletes the worker from the worker_to_job storage
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def delete_from_worker_to_job(worker)
    name = worker.name
    @worker_to_job_mutex.synchronize do
      @worker_to_job.delete(name)
    end
    name
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    job = get_job_from_worker(worker)
    worker_name = delete_from_worker_to_job(worker)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with name #{worker_name.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#workersCelluloid::Actor

Returns the actors that will work on the jobs.

Returns:

  • (Celluloid::Actor)

    the actors that will work on the jobs



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
include Concurrent::Async
include TaxGenerator::ApplicationHelper

QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
  hash.compute_if_absent(queue_name) {
    Concurrent::ThreadPoolExecutor.new(
      min_threads: 10, # create 10 threads at startup
      max_threads: 50, # create at most 50 threads
      max_queue: 0, # unbounded queue of work waiting for an available thread
    )
  }
end

attr_reader :options, :taxonomy, :jobs, :job_to_worker, :worker_to_job

#trap_exit :worker_died

#  receives a list of options that are used to determine the input files and output and input folders
#
# @param  [Hash]  options the options that can determine the input and output files and folders
# @option options [String] :input_dir The input directory
# @option options [String]:output_dir The output directory
# @option options [String] :taxonomy_file_name The taxonomy file name
# @option options [String] :destinations_file_name The destinations file name
#
# @see #work
#
# @return [void]
#
# @api public
def initialize(options = {})
  @options = options.is_a?(Hash) ? options.symbolize_keys : {}
  @jobs_mutex = Mutex.new
  @job_to_worker_mutex = Mutex.new
  @worker_to_job_mutex = Mutex.new
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
end

def enqueue(job_data, queue: 'default') #:nodoc:
  QUEUES[queue].post(job_data) { |job|
    TaxGenerator::FileCreator.spawn(name: "worker_#{job['atlas_id']}", args: [job, self])
  }
  end

  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  #  returns the input folder from the options list
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs_mutex.synchronize do
      @jobs.all? { |_job_id, job| job['status'] == 'finished' }
    end
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.each do |job|
      job = job.stringify_keys
      @jobs_mutex.synchronize do
        @jobs[job['atlas_id']] = job
      end
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    @jobs_mutex.synchronize do
      @jobs.each do |_job_id, job|
        enqueue(job)
      end
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = []
    @taxonomy.taxonomies.each do |taxonomy_node|
      jobs << { atlas_id: taxonomy_node.name, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }
    end
    nokogiri_xml(destinations_file_path).xpath('//destination').each do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
  end

  # registeres a worker inside the job_to_worker storage using the 'atlas_id' key from the job hash
  # @param  [TaxGenerator::FileCreator]  worker the worker that needs to be registered
  # @param  [Hash]  job the job that will be used to register the worker
  #
  # @return [void]
  #
  # @api public
  def register_job_to_worker(job, worker)
    @job_to_worker_mutex.synchronize do
      @job_to_worker[job['atlas_id']] = worker
    end
  end

  # registeres a job to a worker, using the mailbox address of the worker (which is unique)
  # @param  [TaxGenerator::FileCreator]  worker the worker that will be used to registerr the job
  # @param  [Hash]  job the job that willbe registered to a worker
  #
  # @return [void]
  #
  # @api public
  def register_worker_to_job(job, worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name] = job
    end
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  # @see #register_job_to_worker
  # @see #register_worker_to_job
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    register_job_to_worker(job, worker)
    register_worker_to_job(job, worker)
    log_message("worker #{worker.job_id} registed into manager")
    worker.tell(:start_work)
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # fetches the job from a worker
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def get_job_from_worker(worker)
    @worker_to_job_mutex.synchronize do
      @worker_to_job[worker.name]
    end
  end

  # deletes the worker from the worker_to_job storage
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  #
  # @return [void]
  #
  # @api public
  def delete_from_worker_to_job(worker)
    name = worker.name
    @worker_to_job_mutex.synchronize do
      @worker_to_job.delete(name)
    end
    name
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    job = get_job_from_worker(worker)
    worker_name = delete_from_worker_to_job(worker)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with name #{worker_name.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

Instance Method Details

#all_workers_finished?Boolean

checks if all workers finished and returns true or false

Returns:

  • (Boolean)


150
151
152
153
154
# File 'lib/tax_generator/classes/processor.rb', line 150

def all_workers_finished?
  @jobs_mutex.synchronize do
    @jobs.all? { |_job_id, job| job['status'] == 'finished' }
  end
end

#delegate_job(*jobs) ⇒ void

This method returns an undefined value.

registers all the jobs, and then delegates them to workers

Parameters:

  • jobs (Array)

    the jobs that will be delegated to the workers

See Also:



181
182
183
184
185
186
187
188
189
# File 'lib/tax_generator/classes/processor.rb', line 181

def delegate_job(*jobs)
  # jobs need to be added into the manager before starting task to avoid adding new key while iterating
  register_jobs(*jobs)
  @jobs_mutex.synchronize do
    @jobs.each do |_job_id, job|
      enqueue(job)
    end
  end
end

#delete_from_worker_to_job(worker) ⇒ void

This method returns an undefined value.

deletes the worker from the worker_to_job storage

Parameters:



315
316
317
318
319
320
321
# File 'lib/tax_generator/classes/processor.rb', line 315

def delete_from_worker_to_job(worker)
  name = worker.name
  @worker_to_job_mutex.synchronize do
    @worker_to_job.delete(name)
  end
  name
end

#destinations_file_nameString

returns the destinations filename from the option list otherwise the default filename

Returns:

  • (String)


93
94
95
# File 'lib/tax_generator/classes/processor.rb', line 93

def destinations_file_name
  @options.fetch(:destinations_filename, 'destinations.xml')
end

#destinations_file_pathString

returns the full path to the destinations file

Returns:

  • (String)


121
122
123
# File 'lib/tax_generator/classes/processor.rb', line 121

def destinations_file_path
  File.join(input_folder, destinations_file_name)
end

#enqueue(job_data, queue: 'default') ⇒ Object

:nodoc:



61
62
63
64
65
# File 'lib/tax_generator/classes/processor.rb', line 61

def enqueue(job_data, queue: 'default') #:nodoc:
QUEUES[queue].post(job_data) { |job|
  TaxGenerator::FileCreator.spawn(name: "worker_#{job['atlas_id']}", args: [job, self])
}
end

#fetch_file_jobsArray<Hash>

parses the destinations xml document, gets each destination and adds a new job for that destination in the job list and then returns it

Returns:

  • (Array<Hash>)

See Also:

  • ApplicationHelper#nokogiri_xml


198
199
200
201
202
203
204
205
206
207
208
# File 'lib/tax_generator/classes/processor.rb', line 198

def fetch_file_jobs
  jobs = []
  @taxonomy.taxonomies.each do |taxonomy_node|
    jobs << { atlas_id: taxonomy_node.name, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }
  end
  nokogiri_xml(destinations_file_path).xpath('//destination').each do |destination|
    atlas_id = destination.attributes['atlas_id']
    jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
  end
  jobs
end

#generate_filesvoid

This method returns an undefined value.

fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish



218
219
220
221
222
# File 'lib/tax_generator/classes/processor.rb', line 218

def generate_files
  jobs = fetch_file_jobs
  delegate_job(*jobs)
  wait_jobs_termination
end

#get_job_from_worker(worker) ⇒ void

This method returns an undefined value.

fetches the job from a worker

Parameters:



303
304
305
306
307
# File 'lib/tax_generator/classes/processor.rb', line 303

def get_job_from_worker(worker)
  @worker_to_job_mutex.synchronize do
    @worker_to_job[worker.name]
  end
end

#input_folderString

otherwise the default path

Returns:

  • (String)


73
74
75
# File 'lib/tax_generator/classes/processor.rb', line 73

def input_folder
  @options.fetch(:input_dir, "#{root}/data/input")
end

#output_folderString

returns the output folder path from the option list otherwise the default path

Returns:

  • (String)


103
104
105
# File 'lib/tax_generator/classes/processor.rb', line 103

def output_folder
  @options.fetch(:output_dir, "#{root}/data/output")
end

#prepare_output_dirsvoid

This method returns an undefined value.

cleans the output folder and re-creates it and the static folder



139
140
141
142
143
# File 'lib/tax_generator/classes/processor.rb', line 139

def prepare_output_dirs
  FileUtils.rm_rf Dir["#{output_folder}/**/*"]
  create_directories(output_folder, static_output_dir)
  FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
end

#register_job_to_worker(job, worker) ⇒ void

This method returns an undefined value.

registeres a worker inside the job_to_worker storage using the ‘atlas_id’ key from the job hash

Parameters:

  • worker (TaxGenerator::FileCreator)

    the worker that needs to be registered

  • job (Hash)

    the job that will be used to register the worker



241
242
243
244
245
# File 'lib/tax_generator/classes/processor.rb', line 241

def register_job_to_worker(job, worker)
  @job_to_worker_mutex.synchronize do
    @job_to_worker[job['atlas_id']] = worker
  end
end

#register_jobs(*jobs) ⇒ void

This method returns an undefined value.

registers all the jobs so that the managers can have access to them at any time

Parameters:

  • jobs (Array)

    the jobs that will be registered



163
164
165
166
167
168
169
170
# File 'lib/tax_generator/classes/processor.rb', line 163

def register_jobs(*jobs)
  jobs.each do |job|
    job = job.stringify_keys
    @jobs_mutex.synchronize do
      @jobs[job['atlas_id']] = job
    end
  end
end

#register_worker_for_job(job, worker) ⇒ void

This method returns an undefined value.

registers the worker so that the current actor has access to it at any given time and starts the worker

Parameters:

  • job (Hash)

    the job that the worker will work

  • worker (TaxGenerator::FileCreator)

    the worker that will create the file

See Also:



271
272
273
274
275
276
# File 'lib/tax_generator/classes/processor.rb', line 271

def register_worker_for_job(job, worker)
  register_job_to_worker(job, worker)
  register_worker_to_job(job, worker)
  log_message("worker #{worker.job_id} registed into manager")
  worker.tell(:start_work)
end

#register_worker_to_job(job, worker) ⇒ void

This method returns an undefined value.

registeres a job to a worker, using the mailbox address of the worker (which is unique)

Parameters:

  • worker (TaxGenerator::FileCreator)

    the worker that will be used to registerr the job

  • job (Hash)

    the job that willbe registered to a worker



254
255
256
257
258
# File 'lib/tax_generator/classes/processor.rb', line 254

def register_worker_to_job(job, worker)
  @worker_to_job_mutex.synchronize do
    @worker_to_job[worker.name] = job
  end
end

#static_output_dirString

returns the full path to the static folder

Returns:

  • (String)


130
131
132
# File 'lib/tax_generator/classes/processor.rb', line 130

def static_output_dir
  File.join(output_folder, 'static')
end

#taxonomy_file_nameString

returns the taxonomy filename from the option list otherwise the default filename

Returns:

  • (String)


83
84
85
# File 'lib/tax_generator/classes/processor.rb', line 83

def taxonomy_file_name
  @options.fetch(:taxonomy_filename, 'taxonomy.xml')
end

#taxonomy_file_pathString

returns the full path to the taxonomy file

Returns:

  • (String)


112
113
114
# File 'lib/tax_generator/classes/processor.rb', line 112

def taxonomy_file_path
  File.join(input_folder, taxonomy_file_name)
end

#wait_jobs_terminationvoid

This method returns an undefined value.

retrieves the information about the node from the tree and generates for each destination a new File

See Also:

  • #create_file


230
231
232
# File 'lib/tax_generator/classes/processor.rb', line 230

def wait_jobs_termination
  sleep(0.1) until all_workers_finished?
end

#workvoid

This method returns an undefined value.

generates the taxonomy tree , prints it and generates the files

See Also:



286
287
288
289
290
291
292
293
294
295
# File 'lib/tax_generator/classes/processor.rb', line 286

def work
  prepare_output_dirs
  if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
    @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
    @taxonomy.print_tree
    generate_files
  else
    log_message('Please provide valid options', log_method: 'fatal')
  end
end

#worker_died(worker, reason) ⇒ void

This method returns an undefined value.

logs the message about working being dead if a worker crashes

Parameters:



330
331
332
333
334
335
# File 'lib/tax_generator/classes/processor.rb', line 330

def worker_died(worker, reason)
  job = get_job_from_worker(worker)
  worker_name = delete_from_worker_to_job(worker)
  return if reason.blank? || job.blank?
  log_message("worker job #{job['atlas_id']} with name #{worker_name.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
end