Class: DataDrain::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/data_drain/engine.rb

Overview

Motor principal de extracción y purga de datos (DataDrain).

Orquesta el flujo ETL desde PostgreSQL hacia un Data Lake analítico delegando la interacción del almacenamiento al adaptador configurado.

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Engine

Inicializa una nueva instancia del motor de extracción.

Parameters:

  • options (Hash)

    Diccionario de configuración para la extracción.

Options Hash (options):

  • :start_date (Time, DateTime, Date)

    Fecha y hora de inicio.

  • :end_date (Time, DateTime, Date)

    Fecha y hora de fin.

  • :table_name (String)

    Nombre de la tabla en PostgreSQL.

  • :folder_name (String) — default: Opcional

    Nombre de la carpeta destino.

  • :select_sql (String) — default: Opcional

    Sentencia SELECT personalizada.

  • :partition_keys (Array<String, Symbol>)

    Columnas para particionar.

  • :primary_key (String) — default: Opcional

    Clave primaria para borrado. Por defecto ‘id’.

  • :where_clause (String) — default: Opcional

    Condición SQL extra.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/data_drain/engine.rb', line 23

def initialize(options)
  @start_date     = options.fetch(:start_date).beginning_of_day
  @end_date       = options.fetch(:end_date).end_of_day
  @table_name     = options.fetch(:table_name)
  @folder_name    = options.fetch(:folder_name, @table_name)
  @select_sql     = options.fetch(:select_sql, "*")
  @partition_keys = options.fetch(:partition_keys)
  @primary_key    = options.fetch(:primary_key, "id")
  @where_clause   = options[:where_clause]
  @bucket         = options[:bucket]

  @config  = DataDrain.configuration
  @logger  = @config.logger
  @adapter = DataDrain::Storage.adapter

  database = DuckDB::Database.open(":memory:")
  @duckdb  = database.connect
end

Instance Method Details

#callBoolean

Ejecuta el flujo completo del motor: Setup, Conteo, Exportación, Verificación y Purga.

Returns:

  • (Boolean)

    true si el proceso finalizó con éxito, false si falló la integridad.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/data_drain/engine.rb', line 45

def call
  @logger.info "[DataDrain Engine] 🚀 Preparando '#{@table_name}' (#{@start_date.to_date} a #{@end_date.to_date})..."

  setup_duckdb

  @pg_count = get_postgres_count

  if @pg_count.zero?
    @logger.info "[DataDrain Engine] ⏭️ No hay registros que cumplan las condiciones."
    return true
  end

  @logger.info "[DataDrain Engine] 📦 Exportando #{@pg_count} registros a Parquet..."
  export_to_parquet

  if verify_integrity
    purge_from_postgres
    @logger.info "[DataDrain Engine] ✅ Proceso completado exitosamente para '#{@table_name}'."
    true
  else
    @logger.error "[DataDrain Engine] ❌ ERROR de integridad en '#{@table_name}'. Abortando purga."
    false
  end
end