Class: RedshiftConnector::Exporter

Inherits:
Object
  • Object
show all
Defined in:
lib/redshift-connector/exporter.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ds: self.class.default_data_source, query:, bundle:, logger: RedshiftConnector.logger) ⇒ Exporter

Returns a new instance of Exporter.



66
67
68
69
70
71
# File 'lib/redshift-connector/exporter.rb', line 66

def initialize(ds: self.class.default_data_source, query:, bundle:, logger: RedshiftConnector.logger)
  @ds = ds
  @query = query
  @bundle = bundle
  @logger = logger
end

Instance Attribute Details

#bundleObject (readonly)

Returns the value of attribute bundle.



74
75
76
# File 'lib/redshift-connector/exporter.rb', line 74

def bundle
  @bundle
end

#loggerObject (readonly)

Returns the value of attribute logger.



75
76
77
# File 'lib/redshift-connector/exporter.rb', line 75

def logger
  @logger
end

#queryObject (readonly)

Returns the value of attribute query.



73
74
75
# File 'lib/redshift-connector/exporter.rb', line 73

def query
  @query
end

Class Method Details

.default_data_sourceObject



10
11
12
# File 'lib/redshift-connector/exporter.rb', line 10

def Exporter.default_data_source
  @default_data_source or raise ArgumentError, "RedshiftConnector::Exporter.default_data_source was not set"
end

.default_data_source=(ds) ⇒ Object



6
7
8
# File 'lib/redshift-connector/exporter.rb', line 6

def Exporter.default_data_source=(ds)
  @default_data_source = ds
end

.for_query(ds: default_data_source, schema:, table:, bucket: nil, query:, txn_id: "#{Time.now.strftime('%Y%m%d_%H%M%S')}_#{$$}", filter: nil, logger: RedshiftConnector.logger, quiet: false) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/redshift-connector/exporter.rb', line 36

def Exporter.for_query(
    ds: default_data_source,
    schema:,
    table:,
    bucket: nil,
    query:,
    txn_id: "#{Time.now.strftime('%Y%m%d_%H%M%S')}_#{$$}",
    filter: nil,
    logger: RedshiftConnector.logger,
    quiet: false
)
  bucket = bucket ? S3Bucket.get(bucket) : S3Bucket.default
  logger = NullLogger.new if quiet
  bundle = S3DataFileBundle.for_table(
    bucket: bucket,
    schema: schema,
    table: table,
    txn_id: txn_id,
    filter: filter,
    logger: logger
  )
  exporter = Exporter.new(
    ds: ds,
    query: UnloadQuery.wrap(query: query, bundle: bundle),
    bundle: bundle,
    logger: logger
  )
  exporter
end

.for_table(ds: default_data_source, schema:, table:, columns:, bundle:, logger: RedshiftConnector.logger) ⇒ Object



20
21
22
23
24
# File 'lib/redshift-connector/exporter.rb', line 20

def Exporter.for_table(ds: default_data_source, schema:, table:, columns:, bundle:, logger: RedshiftConnector.logger)
  query = SelectAllQuery.new(schema: schema, table: table, columns: columns)
  unload_query = UnloadQuery.new(query: query, bundle: bundle)
  new(ds: ds, query: unload_query, bundle: bundle, logger: logger)
end

.for_table_delta(ds: default_data_source, schema:, table:, condition:, columns:, bundle:, logger: RedshiftConnector.logger) ⇒ Object



14
15
16
17
18
# File 'lib/redshift-connector/exporter.rb', line 14

def Exporter.for_table_delta(ds: default_data_source, schema:, table:, condition:, columns:, bundle:, logger: RedshiftConnector.logger)
  delta_query = DeltaQuery.new(schema: schema, table: table, columns: columns, condition: condition)
  unload_query = UnloadQuery.new(query: delta_query, bundle: bundle)
  new(ds: ds, query: unload_query, bundle: bundle, logger: logger)
end

.foreach(**params, &block) ⇒ Object



26
27
28
29
30
31
32
33
34
# File 'lib/redshift-connector/exporter.rb', line 26

def Exporter.foreach(**params, &block)
  exporter = Exporter.for_query(**params)
  begin
    exporter.execute
    exporter.bundle.each_row(&block)
  ensure
    exporter.bundle.clear
  end
end

Instance Method Details

#batch_job_labelObject



101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/redshift-connector/exporter.rb', line 101

def batch_job_label
  @batch_job_label ||= begin
    components = Dir.getwd.split('/')
    app = if components.last == 'current'
        # is Capistrano environment
        components[-2]
      else
        components[-1]
      end
    batch_file = caller.detect {|c| /redshift-connector|active_record/ !~ c }
    path = batch_file ? batch_file.split(':').first : '?'
    "/* Job: #{app}:#{path} */ "
  end
end

#completed?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/redshift-connector/exporter.rb', line 77

def completed?
  @bundle.bucket.object(flag_object_key).exists?
end

#create_flag_objectObject



81
82
83
84
# File 'lib/redshift-connector/exporter.rb', line 81

def create_flag_object
  @logger.info "TOUCH #{flag_object_key}"
  @bundle.bucket.object(flag_object_key).put(body: "OK")
end

#executeObject



90
91
92
93
94
95
96
97
98
99
# File 'lib/redshift-connector/exporter.rb', line 90

def execute
  @bundle.clear
  @logger.info "EXPORT #{@query.description} -> #{@bundle.url}*"
  @ds.connection_pool.with_connection do |conn|
    stmt = @query.to_sql
    @logger.info "[SQL/Redshift] #{batch_job_label}#{stmt.strip}"
    conn.execute(batch_job_label + stmt)
  end
  create_flag_object
end

#flag_object_keyObject



86
87
88
# File 'lib/redshift-connector/exporter.rb', line 86

def flag_object_key
  "#{File.dirname(@bundle.prefix)}/00completed"
end