Class: RedshiftConnector::S3DataFileBundle
Constant Summary
collapse
- REPORT_SIZE =
10_0000
Instance Attribute Summary collapse
Class Method Summary
collapse
-
.for_prefix(bucket: S3Bucket.default, prefix:, format:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
-
.for_table(bucket: S3Bucket.default, schema:, table:, txn_id:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
Instance Method Summary
collapse
#all_data_objects, #each_object, #each_row
Constructor Details
#initialize(bucket, prefix, format: :csv, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ S3DataFileBundle
Returns a new instance of S3DataFileBundle.
20
21
22
23
24
25
26
27
28
|
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 20
def initialize(bucket, prefix, format: :csv, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger)
@bucket = bucket
@prefix = prefix
@format = format
@filter = filter || lambda {|*row| row }
@batch_size = batch_size
@logger = logger
@reader_class = Reader.get(format)
end
|
Instance Attribute Details
#bucket ⇒ Object
Returns the value of attribute bucket.
30
31
32
|
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 30
def bucket
@bucket
end
|
#prefix ⇒ Object
Returns the value of attribute prefix.
31
32
33
|
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 31
def prefix
@prefix
end
|
Class Method Details
.for_prefix(bucket: S3Bucket.default, prefix:, format:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
10
11
12
13
|
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 10
def self.for_prefix(bucket: S3Bucket.default, prefix:, format:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger)
real_prefix = "#{bucket.prefix}/#{prefix}"
new(bucket, real_prefix, format: format, filter: filter, batch_size: batch_size, logger: logger)
end
|
.for_table(bucket: S3Bucket.default, schema:, table:, txn_id:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
15
16
17
18
|
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 15
def self.for_table(bucket: S3Bucket.default, schema:, table:, txn_id:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger)
prefix = "#{bucket.prefix}/#{schema}_export/#{table}/#{txn_id}/#{table}.csv."
new(bucket, prefix, format: :redshift_csv, filter: filter, batch_size: batch_size, logger: logger)
end
|
Instance Method Details
#clear ⇒ Object
77
78
79
80
81
82
83
84
|
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 77
def clear
pref = File.dirname(@prefix) + '/'
keys = @bucket.objects(prefix: pref).map(&:key)
unless keys.empty?
@logger.info "DELETE #{pref}*"
@bucket.delete_objects(keys)
end
end
|
#credential_string ⇒ Object
37
38
39
|
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 37
def credential_string
@bucket.credential_string
end
|
#data_files ⇒ Object
72
73
74
75
|
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 72
def data_files
@bucket.objects(prefix: @prefix)
.map {|obj| S3DataFile.new(obj, reader_class: @reader_class) }
end
|
#each_batch(report: true) ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 43
def each_batch(report: true)
@logger.info "reader: #{@reader_class}"
n = 0
reported = 0
do_each_batch(@batch_size) do |rows|
yield rows
n += rows.size
if n / REPORT_SIZE > reported
@logger.info "#{n} rows processed" if report
reported = n / REPORT_SIZE
end
end
@logger.info "total #{n} rows processed" if report
end
|
#url ⇒ Object
33
34
35
|
# File 'lib/redshift-connector/s3_data_file_bundle.rb', line 33
def url
"s3://#{@bucket.name}/#{@prefix}"
end
|