Class: DataDrain::FileIngestor

Inherits:
Object
  • Object
show all
Defined in:
lib/data_drain/file_ingestor.rb

Overview

Clase encargada de ingerir archivos locales (CSV, JSON, Parquet) generados por otros servicios (ej. Netflow) y subirlos al Data Lake aplicando compresión ZSTD y particionamiento Hive.

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ FileIngestor

Returns a new instance of FileIngestor.

Parameters:

  • options (Hash)

    Opciones de ingestión.

Options Hash (options):

  • :source_path (String)

    Ruta absoluta al archivo local.

  • :folder_name (String)

    Nombre de la carpeta destino en el Data Lake.

  • :partition_keys (Array<String, Symbol>) — default: Opcional

    Columnas para particionar.

  • :select_sql (String) — default: Opcional

    Sentencia SELECT para transformar datos al vuelo.

  • :delete_after_upload (Boolean) — default: Opcional

    Borra el archivo local al terminar. Por defecto true.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/data_drain/file_ingestor.rb', line 14

def initialize(options)
  @source_path         = options.fetch(:source_path)
  @folder_name         = options.fetch(:folder_name)
  @partition_keys      = options.fetch(:partition_keys, [])
  @select_sql          = options.fetch(:select_sql, "*")
  @delete_after_upload = options.fetch(:delete_after_upload, true)
  @bucket              = options[:bucket]

  @config  = DataDrain.configuration
  @logger  = @config.logger
  @adapter = DataDrain::Storage.adapter

  database = DuckDB::Database.open(":memory:")
  @duckdb  = database.connect
end

Instance Method Details

#callBoolean

Ejecuta el flujo de ingestión.

Returns:

  • (Boolean)

    true si el proceso fue exitoso.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/data_drain/file_ingestor.rb', line 32

def call
  @logger.info "[DataDrain FileIngestor] 🚀 Iniciando ingestión de '#{@source_path}'..."

  unless File.exist?(@source_path)
    @logger.error "[DataDrain FileIngestor] ❌ El archivo origen no existe: #{@source_path}"
    return false
  end

  @adapter.setup_duckdb(@duckdb)

  # Determinamos la función lectora de DuckDB según la extensión del archivo
  reader_function = determine_reader

  # 1. Conteo de seguridad
  source_count = @duckdb.query("SELECT COUNT(*) FROM #{reader_function}").first.first
  @logger.info "[DataDrain FileIngestor] 📊 Encontrados #{source_count} registros para procesar."

  if source_count.zero?
    cleanup_local_file
    return true
  end

  # 2. Exportación / Subida
  @adapter.prepare_export_path(@bucket, @folder_name)
  dest_path = @config.storage_mode.to_sym == :s3 ? "s3://#{@bucket}/#{@folder_name}/" : File.join(@bucket, @folder_name, "")

  partition_clause = @partition_keys.any? ? "PARTITION_BY (#{@partition_keys.join(', ')})," : ""

  query = <<~SQL
    COPY (
      SELECT #{@select_sql}
      FROM #{reader_function}
    ) TO '#{dest_path}'
    (
      FORMAT PARQUET,
      #{partition_clause}
      COMPRESSION 'ZSTD',
      OVERWRITE_OR_IGNORE 1
    );
  SQL

  @logger.info "[DataDrain FileIngestor] ☁️ Escribiendo en el Data Lake..."
  @duckdb.query(query)

  @logger.info "[DataDrain FileIngestor] ✅ Archivo ingerido y comprimido exitosamente."

  cleanup_local_file
  true
rescue DuckDB::Error => e
  @logger.error "[DataDrain FileIngestor] ❌ Error de DuckDB durante la ingestión: #{e.message}"
  false
ensure
  @duckdb&.close
end