Class: RedshiftConnector::DataFileBundleReader

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/redshift_connector/data_file_bundle_reader.rb

Constant Summary collapse

DEFAULT_BATCH_SIZE =
1000
REPORT_SIZE =
10_0000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bundle, filter: nil, batch_size: DEFAULT_BATCH_SIZE, logger: RedshiftConnector.logger) ⇒ DataFileBundleReader

Returns a new instance of DataFileBundleReader.



11
12
13
14
15
16
# File 'lib/redshift_connector/data_file_bundle_reader.rb', line 11

def initialize(bundle, filter: nil, batch_size: DEFAULT_BATCH_SIZE, logger: RedshiftConnector.logger)
  @bundle = bundle
  @filter = filter || lambda {|*row| row }
  @batch_size = batch_size || 1000
  @logger = logger
end

Instance Attribute Details

#batch_sizeObject (readonly)

Returns the value of attribute batch_size.



19
20
21
# File 'lib/redshift_connector/data_file_bundle_reader.rb', line 19

def batch_size
  @batch_size
end

#bundleObject (readonly)

Returns the value of attribute bundle.



18
19
20
# File 'lib/redshift_connector/data_file_bundle_reader.rb', line 18

def bundle
  @bundle
end

#loggerObject (readonly)

Returns the value of attribute logger.



20
21
22
# File 'lib/redshift_connector/data_file_bundle_reader.rb', line 20

def logger
  @logger
end

Instance Method Details

#all_data_objectsObject



46
47
48
# File 'lib/redshift_connector/data_file_bundle_reader.rb', line 46

def all_data_objects
  @bundle.data_files.select {|obj| obj.data_object? }
end

#each_batch(report: true) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/redshift_connector/data_file_bundle_reader.rb', line 52

def each_batch(report: true)
  n = 0
  reported = 0
  do_each_batch(@batch_size) do |rows|
    yield rows
    n += rows.size
    if n / REPORT_SIZE > reported
      @logger.info "#{n} rows processed" if report
      reported = n / REPORT_SIZE
    end
  end
  @logger.info "total #{n} rows processed" if report
end

#each_object(&block) ⇒ Object



39
40
41
42
43
44
# File 'lib/redshift_connector/data_file_bundle_reader.rb', line 39

def each_object(&block)
  all_data_objects.each do |obj|
    @logger.info "processing s3 object: #{obj.key}"
    yield obj
  end
end

#each_row(&block) ⇒ Object Also known as: each



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/redshift_connector/data_file_bundle_reader.rb', line 24

def each_row(&block)
  each_object do |obj|
    if @bundle.respond_to?(:has_manifest?) && @bundle.has_manifest?
      obj.each_row do |row|
        yield RedshiftDataType.type_cast(row, @bundle.manifest_file)
      end
    else
      obj.each_row(&block)
    end

  end
end