Class: RailsRedshiftReplicator::Exporters::Base
- Inherits:
-
Object
- Object
- RailsRedshiftReplicator::Exporters::Base
- Extended by:
- Forwardable
- Defined in:
- lib/rails_redshift_replicator/exporters/base.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#errors ⇒ Object
Returns the value of attribute errors.
-
#file_names ⇒ Object
Returns the value of attribute file_names.
-
#replicable ⇒ Object
readonly
Returns the value of attribute replicable.
-
#replication ⇒ Object
Returns the value of attribute replication.
Instance Method Summary collapse
-
#ar_client ⇒ Object
Returns the ActiveRecord connection adapter for the current database.
-
#build_query_sql(from_record = nil, option = {}) ⇒ Object
Builds the SQL string based on replicable and exporter parameters.
-
#check_index ⇒ Object
Reports missing indexes.
-
#check_target_table ⇒ true, false
Checks if target table exists on Redshift.
-
#connection_adapter ⇒ Object
Returns an instance of a export connection adapter based on ActiveRecord::Base.connection These adapters are required to perform query execution and record retrival in taking advantage of each db driver.
-
#export(options = {}) ⇒ String
Exports results to CSV.
-
#export_and_upload(options = {}) ⇒ Object
Exports and uploads selected records from the source_table.
-
#fields_to_sync ⇒ Array<String>
Lists fields on redshift table.
- #file_manager ⇒ Object
-
#from_record ⇒ Integer?
Retuns the value of last_record from the most recent complete replication record.
-
#has_index? ⇒ true, false
Verifies if the table has the recommended indexes to increase export performance.
- #init_replication_attrs(file_name, format, slices) ⇒ Object
-
#initialize(replicable, current_replication = nil) ⇒ Base
constructor
A new instance of Base.
-
#initialize_replication(file_name, format, slices) ⇒ RailsRedshiftReplicator::Replication
Initialize replication record without saving.
-
#last_record ⇒ Integer
Retuns the last record to export using the replication_field criteria.
-
#last_replication ⇒ RailsRedshiftReplicator::Replication
Returns the last replication from a given table.
-
#query_command(sql) ⇒ Object
Performs the query to retrive records to export.
-
#records(from_record = nil, option = {}) ⇒ Object
Returns records do export.
-
#redshift_schema ⇒ Hash
Schema for the export table on redshift.
-
#table_indexes ⇒ Array<String>
Lists indexes from source table.
-
#upload(files = file_names) ⇒ Object
Uploads file to s3.
Constructor Details
#initialize(replicable, current_replication = nil) ⇒ Base
Returns a new instance of Base.
15 16 17 18 19 20 21 22 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 15 def initialize(replicable, current_replication = nil) @replicable = replicable @replication = current_replication @file_names = nil @errors = nil check_target_table check_index end |
Instance Attribute Details
#errors ⇒ Object
Returns the value of attribute errors.
13 14 15 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 13 def errors @errors end |
#file_names ⇒ Object
Returns the value of attribute file_names.
13 14 15 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 13 def file_names @file_names end |
#replicable ⇒ Object (readonly)
Returns the value of attribute replicable.
12 13 14 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 12 def replicable @replicable end |
#replication ⇒ Object
Returns the value of attribute replication.
13 14 15 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 13 def replication @replication end |
Instance Method Details
#ar_client ⇒ Object
Returns the ActiveRecord connection adapter for the current database
111 112 113 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 111 def ar_client @ar_client ||= ActiveRecord::Base.connection end |
#build_query_sql(from_record = nil, option = {}) ⇒ Object
Builds the SQL string based on replicable and exporter parameters
80 81 82 83 84 85 86 87 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 80 def build_query_sql(from_record = nil, option = {}) sql = "" sql += "SELECT #{option[:counts_only] ? "count(1) as records_count" : fields_to_sync.join(",")}" sql += " FROM #{source_table} WHERE 1=1" sql += " AND #{replication_field} > '#{from_record}'" if from_record sql += " AND #{replication_field} <= '#{last_record}' OR #{replication_field} IS NULL" if last_record sql end |
#check_index ⇒ Object
Reports missing indexes
46 47 48 49 50 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 46 def check_index if !has_index? && replication_field.present? RailsRedshiftReplicator.logger.warn I18n.t(:missing_indexes, replication_field: replication_field, table_name: source_table, scope: :rails_redshift_replicator) end end |
#check_target_table ⇒ true, false
Checks if target table exists on Redshift
54 55 56 57 58 59 60 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 54 def check_target_table unless fields_to_sync = I18n.t(:missing_table, table_name: target_table, scope: :rails_redshift_replicator) RailsRedshiftReplicator.logger.error() @errors = end end |
#connection_adapter ⇒ Object
Returns an instance of a export connection adapter based on ActiveRecord::Base.connection These adapters are required to perform query execution and record retrival in taking advantage of each db driver.
92 93 94 95 96 97 98 99 100 101 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 92 def connection_adapter @connection_adapter ||= begin adapter_class = if ar_client.adapter_name.in? %w(Mysql2 PostgreSQL SQLite) "RailsRedshiftReplicator::Adapters::#{ar_client.adapter_name}".constantize else RailsRedshiftReplicator::Adapters::Generic end adapter_class.new ar_client end end |
#export(options = {}) ⇒ String
Exports results to CSV
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 125 def export( = {}) return if errors.present? slices = [:slices] || RailsRedshiftReplicator.redshift_slices.to_i format = [:format] || RailsRedshiftReplicator.preferred_format file_name = file_manager.temp_file_name initialize_replication(file_name, format, slices) export_start = replication.exporting counts = file_manager.write_csv file_name, records(from_record) unless counts > 0 RailsRedshiftReplicator.logger.info I18n.t(:no_new_records, table_name: source_table, scope: :rails_redshift_replicator) self.replication = nil return end RailsRedshiftReplicator.logger.info I18n.t(:exporting_results, counts: counts, scope: :rails_redshift_replicator) file_manager.split_file file_name, counts replication.exported! export_duration: (Time.now-export_start).ceil, record_count: counts @file_names = Dir.glob "#{file_manager.local_file(file_name)}*" end |
#export_and_upload(options = {}) ⇒ Object
Exports and uploads selected records from the source_table
25 26 27 28 29 30 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 25 def export_and_upload( = {}) RailsRedshiftReplicator::Deleter.new(replicable).handle_delete_propagation files = export upload files replication end |
#fields_to_sync ⇒ Array<String>
Lists fields on redshift table
207 208 209 210 211 212 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 207 def fields_to_sync @fields_to_sync ||= begin column_defs = redshift_schema[target_table] column_defs.blank? ? nil : column_defs.map{ |col_def| col_def['column'] } end end |
#file_manager ⇒ Object
154 155 156 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 154 def file_manager @file_manager ||= RailsRedshiftReplicator::FileManager.new(self) end |
#from_record ⇒ Integer?
Some replication strategies may not use a replication_field(eg: FullExporter), so #from_record will be nil
Retuns the value of last_record from the most recent complete replication record.
118 119 120 121 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 118 def from_record return if replication_field.blank? last_replication.try(:last_record) end |
#has_index? ⇒ true, false
Verifies if the table has the recommended indexes to increase export performance.
40 41 42 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 40 def has_index? replication_field.in? table_indexes end |
#init_replication_attrs(file_name, format, slices) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 165 def init_replication_attrs(file_name, format, slices) { key: file_manager.file_key_in_format(file_name, format), last_record: last_record.to_s, state: 'exporting', replication_type: replication_type, source_table: source_table, target_table: target_table, slices: slices, first_record: from_record, export_format: format } end |
#initialize_replication(file_name, format, slices) ⇒ RailsRedshiftReplicator::Replication
Initialize replication record without saving
160 161 162 163 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 160 def initialize_replication(file_name, format, slices) attrs = init_replication_attrs(file_name, format, slices) @replication = replication.present? ? replication.assign_attributes(attrs) : RailsRedshiftReplicator::Replication.new(attrs) end |
#last_record ⇒ Integer
last_record is an Integer and is computed based on the replication field.
Retuns the last record to export using the replication_field criteria.
188 189 190 191 192 193 194 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 188 def last_record return if replication_field.blank? @last_record ||= begin sql = "SELECT max(#{replication_field}) as _last_record from #{source_table}" connection_adapter.last_record_query_command(sql) end end |
#last_replication ⇒ RailsRedshiftReplicator::Replication
Returns the last replication from a given table
181 182 183 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 181 def last_replication @last_replication ||= RailsRedshiftReplicator::Replication.from_table(source_table).last end |
#query_command(sql) ⇒ Object
Performs the query to retrive records to export
105 106 107 108 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 105 def query_command(sql) RailsRedshiftReplicator.logger.debug I18n.t(:executing_query, scope: :rails_redshift_replicator, sql: sql, adapter: connection_adapter.class.name) connection_adapter.query_command sql end |
#records(from_record = nil, option = {}) ⇒ Object
Query cache is disabled to decrease memory usage.
Returns records do export
70 71 72 73 74 75 76 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 70 def records(from_record = nil, option = {}) @records ||= begin ActiveRecord::Base.uncached do query_command build_query_sql(from_record, option) end end end |
#redshift_schema ⇒ Hash
Schema for the export table on redshift
198 199 200 201 202 203 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 198 def redshift_schema @schema ||= begin result = ::RailsRedshiftReplicator.connection.exec("select tablename, \"column\", type from pg_table_def where tablename = '#{target_table}'") result.to_a.group_by{ |el| el["tablename"] } end end |
#table_indexes ⇒ Array<String>
Lists indexes from source table
34 35 36 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 34 def table_indexes ActiveRecord::Base.connection.indexes(source_table).map{ |table| table.columns}.flatten | ["id"] end |
#upload(files = file_names) ⇒ Object
Uploads file to s3
147 148 149 150 151 152 |
# File 'lib/rails_redshift_replicator/exporters/base.rb', line 147 def upload(files = file_names) return if errors.present? || files.blank? upload_start = replication.uploading! replication.gzip? ? file_manager.upload_gzip(files) : file_manager.upload_csv(files) replication.uploaded! upload_duration: (Time.now-upload_start).ceil end |