Class: RedshiftConnector::S3DataFileBundle
- Inherits:
-
Object
- Object
- RedshiftConnector::S3DataFileBundle
- Defined in:
- lib/redshift-connector/s3_data_file_bundle.rb
Constant Summary collapse
- REPORT_SIZE =
10_0000
Instance Attribute Summary collapse
-
#bucket ⇒ Object
readonly
Returns the value of attribute bucket.
-
#prefix ⇒ Object
readonly
Returns the value of attribute prefix.
Class Method Summary collapse
- .for_prefix(bucket: S3Bucket.default, prefix:, format:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
- .for_table(bucket: S3Bucket.default, schema:, table:, txn_id:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
Instance Method Summary collapse
- #all_data_objects ⇒ Object
- #clear ⇒ Object
- #credential_string ⇒ Object
- #each_batch(report: true) ⇒ Object
- #each_object(&block) ⇒ Object
- #each_row(&block) ⇒ Object (also: #each)
-
#initialize(bucket, prefix, format: :csv, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ S3DataFileBundle
constructor
A new instance of S3DataFileBundle.
- #url ⇒ Object
Constructor Details
#initialize(bucket, prefix, format: :csv, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ S3DataFileBundle
19 20 21 22 23 24 25 26 27 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 19 def initialize(bucket, prefix, format: :csv, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) @bucket = bucket @prefix = prefix @format = format @filter = filter || lambda {|*row| row } @batch_size = batch_size @logger = logger @reader_class = Reader.get(format) end |
Instance Attribute Details
#bucket ⇒ Object (readonly)
Returns the value of attribute bucket.
29 30 31 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 29 def bucket @bucket end |
#prefix ⇒ Object (readonly)
Returns the value of attribute prefix.
30 31 32 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 30 def prefix @prefix end |
Class Method Details
.for_prefix(bucket: S3Bucket.default, prefix:, format:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
9 10 11 12 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 9 def self.for_prefix(bucket: S3Bucket.default, prefix:, format:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) real_prefix = "#{bucket.prefix}/#{prefix}" new(bucket, real_prefix, format: format, filter: filter, batch_size: batch_size, logger: logger) end |
.for_table(bucket: S3Bucket.default, schema:, table:, txn_id:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
14 15 16 17 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 14 def self.for_table(bucket: S3Bucket.default, schema:, table:, txn_id:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) prefix = "#{bucket.prefix}/#{schema}_export/#{table}/#{txn_id}/#{table}.csv." new(bucket, prefix, format: :redshift_csv, filter: filter, batch_size: batch_size, logger: logger) end |
Instance Method Details
#all_data_objects ⇒ Object
86 87 88 89 90 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 86 def all_data_objects @bucket.objects(prefix: @prefix) .map {|obj| S3DataFile.new(obj, reader_class: @reader_class) } .select {|obj| obj.data_object? } end |
#clear ⇒ Object
92 93 94 95 96 97 98 99 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 92 def clear pref = File.dirname(@prefix) + '/' keys = @bucket.objects(prefix: pref).map(&:key) unless keys.empty? @logger.info "DELETE #{pref}*" @bucket.delete_objects(keys) end end |
#credential_string ⇒ Object
36 37 38 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 36 def credential_string @bucket.credential_string end |
#each_batch(report: true) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 42 def each_batch(report: true) @logger.info "reader: #{@reader_class}" n = 0 reported = 0 do_each_batch(@batch_size) do |rows| yield rows n += rows.size if n / REPORT_SIZE > reported @logger.info "#{n} rows processed" if report reported = n / REPORT_SIZE end end @logger.info "total #{n} rows processed" if report end |
#each_object(&block) ⇒ Object
79 80 81 82 83 84 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 79 def each_object(&block) all_data_objects.each do |obj| @logger.info "processing s3 object: #{obj.key}" yield obj end end |
#each_row(&block) ⇒ Object Also known as: each
71 72 73 74 75 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 71 def each_row(&block) each_object do |obj| obj.each_row(&block) end end |
#url ⇒ Object
32 33 34 |
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 32 def url "s3://#{@bucket.name}/#{@prefix}" end |