Class: DataDrain::FileIngestor
- Inherits:
-
Object
- Object
- DataDrain::FileIngestor
- Defined in:
- lib/data_drain/file_ingestor.rb
Overview
Clase encargada de ingerir archivos locales (CSV, JSON, Parquet) generados por otros servicios (ej. Netflow) y subirlos al Data Lake aplicando compresión ZSTD y particionamiento Hive.
Instance Method Summary collapse
-
#call ⇒ Boolean
Ejecuta el flujo de ingestión.
-
#initialize(options) ⇒ FileIngestor
constructor
A new instance of FileIngestor.
Constructor Details
#initialize(options) ⇒ FileIngestor
Returns a new instance of FileIngestor.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/data_drain/file_ingestor.rb', line 14 def initialize() @source_path = .fetch(:source_path) @folder_name = .fetch(:folder_name) @partition_keys = .fetch(:partition_keys, []) @select_sql = .fetch(:select_sql, "*") @delete_after_upload = .fetch(:delete_after_upload, true) @bucket = [:bucket] @config = DataDrain.configuration @logger = @config.logger @adapter = DataDrain::Storage.adapter database = DuckDB::Database.open(":memory:") @duckdb = database.connect end |
Instance Method Details
#call ⇒ Boolean
Ejecuta el flujo de ingestión.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/data_drain/file_ingestor.rb', line 32 def call @logger.info "[DataDrain FileIngestor] 🚀 Iniciando ingestión de '#{@source_path}'..." unless File.exist?(@source_path) @logger.error "[DataDrain FileIngestor] ❌ El archivo origen no existe: #{@source_path}" return false end @adapter.setup_duckdb(@duckdb) # Determinamos la función lectora de DuckDB según la extensión del archivo reader_function = determine_reader # 1. Conteo de seguridad source_count = @duckdb.query("SELECT COUNT(*) FROM #{reader_function}").first.first @logger.info "[DataDrain FileIngestor] 📊 Encontrados #{source_count} registros para procesar." if source_count.zero? cleanup_local_file return true end # 2. Exportación / Subida @adapter.prepare_export_path(@bucket, @folder_name) dest_path = @config.storage_mode.to_sym == :s3 ? "s3://#{@bucket}/#{@folder_name}/" : File.join(@bucket, @folder_name, "") partition_clause = @partition_keys.any? ? "PARTITION_BY (#{@partition_keys.join(', ')})," : "" query = <<~SQL COPY ( SELECT #{@select_sql} FROM #{reader_function} ) TO '#{dest_path}' ( FORMAT PARQUET, #{partition_clause} COMPRESSION 'ZSTD', OVERWRITE_OR_IGNORE 1 ); SQL @logger.info "[DataDrain FileIngestor] ☁️ Escribiendo en el Data Lake..." @duckdb.query(query) @logger.info "[DataDrain FileIngestor] ✅ Archivo ingerido y comprimido exitosamente." cleanup_local_file true rescue DuckDB::Error => e @logger.error "[DataDrain FileIngestor] ❌ Error de DuckDB durante la ingestión: #{e.}" false ensure @duckdb&.close end |