Class: Etna::Clients::Magma::MaterializeDataWorkflow
- Inherits:
-
Struct
- Object
- Struct
- Etna::Clients::Magma::MaterializeDataWorkflow
- Defined in:
- lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
Returns the value of attribute concurrency.
-
#filesystem ⇒ Object
Returns the value of attribute filesystem.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#magma_client ⇒ Object
Returns the value of attribute magma_client.
-
#metis_client ⇒ Object
Returns the value of attribute metis_client.
-
#model_attributes_mask ⇒ Object
Returns the value of attribute model_attributes_mask.
-
#model_filters ⇒ Object
Returns the value of attribute model_filters.
-
#model_name ⇒ Object
Returns the value of attribute model_name.
-
#page_size ⇒ Object
Returns the value of attribute page_size.
-
#project_name ⇒ Object
Returns the value of attribute project_name.
-
#record_names ⇒ Object
Returns the value of attribute record_names.
-
#stub_files ⇒ Object
Returns the value of attribute stub_files.
Instance Method Summary collapse
- #each_file(template, record, &block) ⇒ Object
-
#initialize(**kwds) ⇒ MaterializeDataWorkflow
constructor
A new instance of MaterializeDataWorkflow.
- #magma_crud ⇒ Object
- #materialize_all(dest) ⇒ Object
- #materialize_record(dest_dir, template, record) ⇒ Object
- #metadata_file_name(record_name:, record_model_name:, ext:) ⇒ Object
- #model_walker ⇒ Object
- #sync_metis_data_workflow ⇒ Object
Constructor Details
#initialize(**kwds) ⇒ MaterializeDataWorkflow
Returns a new instance of MaterializeDataWorkflow.
14 15 16 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 14 def initialize(**kwds) super(**({filesystem: Etna::Filesystem.new, page_size: 20, concurrency: 10, record_names: "all"}.update(kwds))) end |
Instance Attribute Details
#concurrency ⇒ Object
Returns the value of attribute concurrency
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def concurrency @concurrency end |
#filesystem ⇒ Object
Returns the value of attribute filesystem
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def filesystem @filesystem end |
#logger ⇒ Object
Returns the value of attribute logger
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def logger @logger end |
#magma_client ⇒ Object
Returns the value of attribute magma_client
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def magma_client @magma_client end |
#metis_client ⇒ Object
Returns the value of attribute metis_client
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def metis_client @metis_client end |
#model_attributes_mask ⇒ Object
Returns the value of attribute model_attributes_mask
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def model_attributes_mask @model_attributes_mask end |
#model_filters ⇒ Object
Returns the value of attribute model_filters
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def model_filters @model_filters end |
#model_name ⇒ Object
Returns the value of attribute model_name
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def model_name @model_name end |
#page_size ⇒ Object
Returns the value of attribute page_size
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def page_size @page_size end |
#project_name ⇒ Object
Returns the value of attribute project_name
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def project_name @project_name end |
#record_names ⇒ Object
Returns the value of attribute record_names
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def record_names @record_names end |
#stub_files ⇒ Object
Returns the value of attribute stub_files
8 9 10 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 8 def stub_files @stub_files end |
Instance Method Details
#each_file(template, record, &block) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 74 def each_file(template, record, &block) results = [] template.attributes.all.each do |attribute| if attribute.attribute_type == AttributeType::FILE_COLLECTION record[attribute.name]&.each_with_index do |file, i| results << [attribute, file, i] end elsif attribute.attribute_type == AttributeType::FILE results << [attribute, record[attribute.name], 0] end end results.each do |attr, file, idx| next if file.nil? next unless file.is_a?(Hash) next unless file['url'] yield attr.name, file['url'], (file['original_filename'] || File.basename(file['path'])), idx end end |
#magma_crud ⇒ Object
18 19 20 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 18 def magma_crud @magma_crud ||= Etna::Clients::Magma::MagmaCrudWorkflow.new(magma_client: magma_client, project_name: project_name) end |
#materialize_all(dest) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 26 def materialize_all(dest) templates = {} semaphore = Concurrent::Semaphore.new(concurrency) errors = Queue.new model_walker.walk_from( model_name, record_names, model_attributes_mask: model_attributes_mask, model_filters: model_filters, page_size: page_size, ) do |template, document| logger&.info("Materializing #{template.name}##{document[template.identifier]}") templates[template.name] = template begin if (error = errors.pop(true)) logger&.error(error) raise error end rescue ThreadError end semaphore.acquire Thread.new do begin materialize_record(dest, template, document) rescue => e logger&.error(e) errors << e ensure semaphore.release end end end semaphore.acquire(concurrency) begin if (error = errors.pop(true)) logger&.error(error) raise error end rescue ThreadError end end |
#materialize_record(dest_dir, template, record) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 102 def materialize_record(dest_dir, template, record) record_to_serialize = record.dup each_file(template, record) do |attr_name, url, filename, idx| if idx == 0 record_to_serialize[attr_name] = [] end dest_file = File.join(dest_dir, (record_name: record[template.identifier], record_model_name: template.name, ext: "_#{attr_name}_#{idx}#{File.extname(filename)}")) filesystem.mkdir_p(File.dirname(dest_file)) sync_metis_data_workflow.copy_file(dest: dest_file, url: url, stub: stub_files) record_to_serialize[attr_name] << {file: dest_file, original_filename: filename} end dest_file = File.join(dest_dir, (record_name: record[template.identifier], record_model_name: template.name, ext: '.json')) filesystem.mkdir_p(File.dirname(dest_file)) json = record_to_serialize.to_json filesystem.with_writeable(dest_file, "w", size_hint: json.bytes.length) do |io| io.write(json) end end |
#metadata_file_name(record_name:, record_model_name:, ext:) ⇒ Object
125 126 127 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 125 def (record_name:, record_model_name:, ext:) "#{record_model_name}/#{record_name.gsub(/\s/, '_')}#{ext}" end |
#model_walker ⇒ Object
22 23 24 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 22 def model_walker @model_walker ||= WalkModelTreeWorkflow.new(magma_crud: magma_crud, logger: logger) end |
#sync_metis_data_workflow ⇒ Object
95 96 97 98 99 100 |
# File 'lib/etna/clients/magma/workflows/materialize_magma_record_files_workflow.rb', line 95 def sync_metis_data_workflow @sync_metis_data_workflow ||= Etna::Clients::Metis::SyncMetisDataWorkflow.new( metis_client: metis_client, logger: logger, filesystem: filesystem) end |