Class: DataDrain::Engine
- Inherits:
-
Object
- Object
- DataDrain::Engine
- Defined in:
- lib/data_drain/engine.rb
Overview
Motor principal de extracción y purga de datos (DataDrain).
Orquesta el flujo ETL desde PostgreSQL hacia un Data Lake analítico delegando la interacción del almacenamiento al adaptador configurado.
Instance Method Summary collapse
-
#call ⇒ Boolean
Ejecuta el flujo completo del motor: Setup, Conteo, Exportación, Verificación y Purga.
-
#initialize(options) ⇒ Engine
constructor
Inicializa una nueva instancia del motor de extracción.
Constructor Details
#initialize(options) ⇒ Engine
Inicializa una nueva instancia del motor de extracción.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/data_drain/engine.rb', line 23 def initialize() @start_date = .fetch(:start_date).beginning_of_day @end_date = .fetch(:end_date).end_of_day @table_name = .fetch(:table_name) @folder_name = .fetch(:folder_name, @table_name) @select_sql = .fetch(:select_sql, "*") @partition_keys = .fetch(:partition_keys) @primary_key = .fetch(:primary_key, "id") @where_clause = [:where_clause] @bucket = [:bucket] @config = DataDrain.configuration @logger = @config.logger @adapter = DataDrain::Storage.adapter database = DuckDB::Database.open(":memory:") @duckdb = database.connect end |
Instance Method Details
#call ⇒ Boolean
Ejecuta el flujo completo del motor: Setup, Conteo, Exportación, Verificación y Purga.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/data_drain/engine.rb', line 45 def call @logger.info "[DataDrain Engine] 🚀 Preparando '#{@table_name}' (#{@start_date.to_date} a #{@end_date.to_date})..." setup_duckdb @pg_count = get_postgres_count if @pg_count.zero? @logger.info "[DataDrain Engine] ⏭️ No hay registros que cumplan las condiciones." return true end @logger.info "[DataDrain Engine] 📦 Exportando #{@pg_count} registros a Parquet..." export_to_parquet if verify_integrity purge_from_postgres @logger.info "[DataDrain Engine] ✅ Proceso completado exitosamente para '#{@table_name}'." true else @logger.error "[DataDrain Engine] ❌ ERROR de integridad en '#{@table_name}'. Abortando purga." false end end |