Class: RedshiftConnector::Exporter
- Inherits:
-
Object
- Object
- RedshiftConnector::Exporter
- Defined in:
- lib/redshift-connector/exporter.rb
Instance Attribute Summary collapse
-
#bundle ⇒ Object
readonly
Returns the value of attribute bundle.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#query ⇒ Object
readonly
Returns the value of attribute query.
Class Method Summary collapse
- .default_data_source ⇒ Object
- .default_data_source=(ds) ⇒ Object
- .for_query(ds: default_data_source, schema:, table:, bucket: nil, query:, txn_id: "#{Time.now.strftime('%Y%m%d_%H%M%S')}_#{$$}", filter: nil, logger: RedshiftConnector.logger, quiet: false) ⇒ Object
- .for_table(ds: default_data_source, schema:, table:, columns:, bundle:, logger: RedshiftConnector.logger) ⇒ Object
- .for_table_delta(ds: default_data_source, schema:, table:, condition:, columns:, bundle:, logger: RedshiftConnector.logger) ⇒ Object
- .foreach(**params, &block) ⇒ Object
Instance Method Summary collapse
- #batch_job_label ⇒ Object
- #completed? ⇒ Boolean
- #create_flag_object ⇒ Object
- #execute ⇒ Object
- #flag_object_key ⇒ Object
-
#initialize(ds: self.class.default_data_source, query:, bundle:, logger: RedshiftConnector.logger) ⇒ Exporter
constructor
A new instance of Exporter.
Constructor Details
#initialize(ds: self.class.default_data_source, query:, bundle:, logger: RedshiftConnector.logger) ⇒ Exporter
Returns a new instance of Exporter.
66 67 68 69 70 71 |
# File 'lib/redshift-connector/exporter.rb', line 66 def initialize(ds: self.class.default_data_source, query:, bundle:, logger: RedshiftConnector.logger) @ds = ds @query = query @bundle = bundle @logger = logger end |
Instance Attribute Details
#bundle ⇒ Object (readonly)
Returns the value of attribute bundle.
74 75 76 |
# File 'lib/redshift-connector/exporter.rb', line 74 def bundle @bundle end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
75 76 77 |
# File 'lib/redshift-connector/exporter.rb', line 75 def logger @logger end |
#query ⇒ Object (readonly)
Returns the value of attribute query.
73 74 75 |
# File 'lib/redshift-connector/exporter.rb', line 73 def query @query end |
Class Method Details
.default_data_source ⇒ Object
10 11 12 |
# File 'lib/redshift-connector/exporter.rb', line 10 def Exporter.default_data_source @default_data_source or raise ArgumentError, "RedshiftConnector::Exporter.default_data_source was not set" end |
.default_data_source=(ds) ⇒ Object
6 7 8 |
# File 'lib/redshift-connector/exporter.rb', line 6 def Exporter.default_data_source=(ds) @default_data_source = ds end |
.for_query(ds: default_data_source, schema:, table:, bucket: nil, query:, txn_id: "#{Time.now.strftime('%Y%m%d_%H%M%S')}_#{$$}", filter: nil, logger: RedshiftConnector.logger, quiet: false) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/redshift-connector/exporter.rb', line 36 def Exporter.for_query( ds: default_data_source, schema:, table:, bucket: nil, query:, txn_id: "#{Time.now.strftime('%Y%m%d_%H%M%S')}_#{$$}", filter: nil, logger: RedshiftConnector.logger, quiet: false ) bucket = bucket ? S3Bucket.get(bucket) : S3Bucket.default logger = NullLogger.new if quiet bundle = S3DataFileBundle.for_table( bucket: bucket, schema: schema, table: table, txn_id: txn_id, filter: filter, logger: logger ) exporter = Exporter.new( ds: ds, query: UnloadQuery.wrap(query: query, bundle: bundle), bundle: bundle, logger: logger ) exporter end |
.for_table(ds: default_data_source, schema:, table:, columns:, bundle:, logger: RedshiftConnector.logger) ⇒ Object
20 21 22 23 24 |
# File 'lib/redshift-connector/exporter.rb', line 20 def Exporter.for_table(ds: default_data_source, schema:, table:, columns:, bundle:, logger: RedshiftConnector.logger) query = SelectAllQuery.new(schema: schema, table: table, columns: columns) unload_query = UnloadQuery.new(query: query, bundle: bundle) new(ds: ds, query: unload_query, bundle: bundle, logger: logger) end |
.for_table_delta(ds: default_data_source, schema:, table:, condition:, columns:, bundle:, logger: RedshiftConnector.logger) ⇒ Object
14 15 16 17 18 |
# File 'lib/redshift-connector/exporter.rb', line 14 def Exporter.for_table_delta(ds: default_data_source, schema:, table:, condition:, columns:, bundle:, logger: RedshiftConnector.logger) delta_query = DeltaQuery.new(schema: schema, table: table, columns: columns, condition: condition) unload_query = UnloadQuery.new(query: delta_query, bundle: bundle) new(ds: ds, query: unload_query, bundle: bundle, logger: logger) end |
Instance Method Details
#batch_job_label ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/redshift-connector/exporter.rb', line 101 def batch_job_label @batch_job_label ||= begin components = Dir.getwd.split('/') app = if components.last == 'current' # is Capistrano environment components[-2] else components[-1] end batch_file = caller.detect {|c| /redshift-connector|active_record/ !~ c } path = batch_file ? batch_file.split(':').first : '?' "/* Job: #{app}:#{path} */ " end end |
#completed? ⇒ Boolean
77 78 79 |
# File 'lib/redshift-connector/exporter.rb', line 77 def completed? @bundle.bucket.object(flag_object_key).exists? end |
#create_flag_object ⇒ Object
81 82 83 84 |
# File 'lib/redshift-connector/exporter.rb', line 81 def create_flag_object @logger.info "TOUCH #{flag_object_key}" @bundle.bucket.object(flag_object_key).put(body: "OK") end |
#execute ⇒ Object
90 91 92 93 94 95 96 97 98 99 |
# File 'lib/redshift-connector/exporter.rb', line 90 def execute @bundle.clear @logger.info "EXPORT #{@query.description} -> #{@bundle.url}*" @ds.connection_pool.with_connection do |conn| stmt = @query.to_sql @logger.info "[SQL/Redshift] #{batch_job_label}#{stmt.strip}" conn.execute(batch_job_label + stmt) end create_flag_object end |
#flag_object_key ⇒ Object
86 87 88 |
# File 'lib/redshift-connector/exporter.rb', line 86 def flag_object_key "#{File.dirname(@bundle.prefix)}/00completed" end |