Class: RailsRedshiftReplicator::Exporters::Base

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/rails_redshift_replicator/exporters/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(replicable, current_replication = nil) ⇒ Base

Returns a new instance of Base.



15
16
17
18
19
20
21
22
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 15

def initialize(replicable, current_replication = nil)
  @replicable = replicable
  @replication = current_replication
  @file_names = nil
  @errors = nil
  check_target_table
  check_index
end

Instance Attribute Details

#errorsObject

Returns the value of attribute errors.



13
14
15
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 13

def errors
  @errors
end

#file_namesObject

Returns the value of attribute file_names.



13
14
15
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 13

def file_names
  @file_names
end

#replicableObject (readonly)

Returns the value of attribute replicable.



12
13
14
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 12

def replicable
  @replicable
end

#replicationObject

Returns the value of attribute replication.



13
14
15
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 13

def replication
  @replication
end

Instance Method Details

#ar_clientObject

Returns the ActiveRecord connection adapter for the current database



111
112
113
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 111

def ar_client
  @ar_client ||= ActiveRecord::Base.connection
end

#build_query_sql(from_record = nil, option = {}) ⇒ Object

Builds the SQL string based on replicable and exporter parameters



80
81
82
83
84
85
86
87
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 80

def build_query_sql(from_record = nil, option = {})
  sql = ""
  sql += "SELECT #{option[:counts_only] ? "count(1) as records_count" : fields_to_sync.join(",")}"
  sql += " FROM #{source_table} WHERE 1=1"
  sql += " AND #{replication_field} > '#{from_record}'" if from_record
  sql += " AND #{replication_field} <= '#{last_record}' OR #{replication_field} IS NULL" if last_record
  sql
end

#check_indexObject

Reports missing indexes

See Also:



46
47
48
49
50
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 46

def check_index
  if !has_index? && replication_field.present?
    RailsRedshiftReplicator.logger.warn I18n.t(:missing_indexes, replication_field: replication_field, table_name: source_table, scope: :rails_redshift_replicator)
  end
end

#check_target_tabletrue, false

Checks if target table exists on Redshift

Returns:

  • (true, false)

    if table exists



54
55
56
57
58
59
60
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 54

def check_target_table
  unless fields_to_sync
    message = I18n.t(:missing_table, table_name: target_table, scope: :rails_redshift_replicator)
    RailsRedshiftReplicator.logger.error(message) 
    @errors = message
  end
end

#connection_adapterObject

Returns an instance of a export connection adapter based on ActiveRecord::Base.connection These adapters are required to perform query execution and record retrival in taking advantage of each db driver.



92
93
94
95
96
97
98
99
100
101
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 92

def connection_adapter
  @connection_adapter ||= begin
    adapter_class = if ar_client.adapter_name.in? %w(Mysql2 PostgreSQL SQLite)
      "RailsRedshiftReplicator::Adapters::#{ar_client.adapter_name}".constantize
    else
      RailsRedshiftReplicator::Adapters::Generic
    end
    adapter_class.new ar_client
  end
end

#export(options = {}) ⇒ String

Exports results to CSV

Returns:

  • (String)

    file name



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 125

def export(options = {})
  return if errors.present?
  slices = options[:slices] || RailsRedshiftReplicator.redshift_slices.to_i
  format = options[:format] || RailsRedshiftReplicator.preferred_format
  file_name = file_manager.temp_file_name
  initialize_replication(file_name, format, slices)
  export_start = replication.exporting
  counts = file_manager.write_csv file_name, records(from_record)
  unless counts > 0
    RailsRedshiftReplicator.logger.info I18n.t(:no_new_records, table_name: source_table, scope: :rails_redshift_replicator)
    self.replication = nil
    return
  end
  RailsRedshiftReplicator.logger.info I18n.t(:exporting_results, counts: counts, scope: :rails_redshift_replicator)
  file_manager.split_file file_name, counts
  replication.exported! export_duration: (Time.now-export_start).ceil, record_count: counts
  @file_names = Dir.glob "#{file_manager.local_file(file_name)}*"
end

#export_and_upload(options = {}) ⇒ Object

Exports and uploads selected records from the source_table



25
26
27
28
29
30
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 25

def export_and_upload(options = {})
  RailsRedshiftReplicator::Deleter.new(replicable).handle_delete_propagation
  files = export options
  upload files
  replication
end

#fields_to_syncArray<String>

Lists fields on redshift table

Returns:

  • (Array<String>)

    colunas na tabela do Redshift



207
208
209
210
211
212
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 207

def fields_to_sync
  @fields_to_sync ||= begin
    column_defs = redshift_schema[target_table]
    column_defs.blank? ? nil : column_defs.map{ |col_def| col_def['column'] }
  end
end

#file_managerObject



154
155
156
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 154

def file_manager
  @file_manager ||= RailsRedshiftReplicator::FileManager.new(self)
end

#from_recordInteger?

Note:

Some replication strategies may not use a replication_field(eg: FullExporter), so #from_record will be nil

Retuns the value of last_record from the most recent complete replication record.

Returns:

  • (Integer, nil)

    last_record



118
119
120
121
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 118

def from_record
  return if replication_field.blank?
  last_replication.try(:last_record)
end

#has_index?true, false

Verifies if the table has the recommended indexes to increase export performance.

Returns:

  • (true, false)

    if table has recommended indexes



40
41
42
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 40

def has_index?
  replication_field.in? table_indexes
end

#init_replication_attrs(file_name, format, slices) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 165

def init_replication_attrs(file_name, format, slices)
  {
    key: file_manager.file_key_in_format(file_name, format),
    last_record: last_record.to_s,
    state: 'exporting',
    replication_type: replication_type,
    source_table: source_table,
    target_table: target_table,
    slices: slices,
    first_record: from_record,
    export_format: format
  }
end

#initialize_replication(file_name, format, slices) ⇒ RailsRedshiftReplicator::Replication

Initialize replication record without saving



160
161
162
163
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 160

def initialize_replication(file_name, format, slices)
  attrs = init_replication_attrs(file_name, format, slices)
  @replication = replication.present? ? replication.assign_attributes(attrs) : RailsRedshiftReplicator::Replication.new(attrs)
end

#last_recordInteger

Note:

last_record is an Integer and is computed based on the replication field.

Retuns the last record to export using the replication_field criteria.

Returns:

  • (Integer)

    content of the



188
189
190
191
192
193
194
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 188

def last_record
  return if replication_field.blank?
  @last_record ||= begin
    sql = "SELECT max(#{replication_field}) as _last_record from #{source_table}"
    connection_adapter.last_record_query_command(sql)
  end
end

#last_replicationRailsRedshiftReplicator::Replication

Returns the last replication from a given table

Returns:



181
182
183
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 181

def last_replication
  @last_replication ||= RailsRedshiftReplicator::Replication.from_table(source_table).last
end

#query_command(sql) ⇒ Object

Performs the query to retrive records to export

Parameters:

  • sql (String)

    sql to execute



105
106
107
108
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 105

def query_command(sql)
  RailsRedshiftReplicator.logger.debug I18n.t(:executing_query, scope: :rails_redshift_replicator, sql: sql, adapter: connection_adapter.class.name)
  connection_adapter.query_command sql
end

#records(from_record = nil, option = {}) ⇒ Object

Note:

Query cache is disabled to decrease memory usage.

Returns records do export

Parameters:

  • from_record (Integer) (defaults to: nil)

    initial record When the exporter type is identity, the record is an id or equivalent When the exporter type is timed, the record is the timestamp converted to epoch (date.to_i)

  • options (Hash)
  • :options (Hash)

    a customizable set of options



70
71
72
73
74
75
76
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 70

def records(from_record = nil, option = {})
  @records ||= begin
    ActiveRecord::Base.uncached do
      query_command build_query_sql(from_record, option)
    end
  end
end

#redshift_schemaHash

Schema for the export table on redshift

Returns:

  • (Hash)

    array of fields per table



198
199
200
201
202
203
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 198

def redshift_schema
  @schema ||= begin
    result = ::RailsRedshiftReplicator.connection.exec("select tablename, \"column\", type from pg_table_def where tablename = '#{target_table}'")
    result.to_a.group_by{ |el| el["tablename"] }
  end
end

#table_indexesArray<String>

Lists indexes from source table

Returns:

  • (Array<String>)

    indexes from source table



34
35
36
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 34

def table_indexes
  ActiveRecord::Base.connection.indexes(source_table).map{ |table| table.columns}.flatten | ["id"]
end

#upload(files = file_names) ⇒ Object

Uploads file to s3

Parameters:

  • nome (String)

    do arquivo

  • file (String)

    name



147
148
149
150
151
152
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 147

def upload(files = file_names)
  return if errors.present? || files.blank?
  upload_start = replication.uploading!
  replication.gzip? ? file_manager.upload_gzip(files) : file_manager.upload_csv(files)
  replication.uploaded! upload_duration: (Time.now-upload_start).ceil
end