Class: ETL::Control::DatabaseDestination

Inherits:
Destination show all
Defined in:
lib/etl/control/destination/database_destination.rb

Overview

Destination which writes directly to a database. This is useful when you are dealing with a small amount of data. For larger amounts of data you should probably use the bulk loader if it is supported with your target database as it will use a much faster load method.

Instance Attribute Summary collapse

Attributes inherited from Destination

#append_rows, #buffer_size, #condition, #configuration, #control, #mapping, #unique

Instance Method Summary collapse

Methods inherited from Destination

class_for_name, #current_row, #errors, #write

Constructor Details

#initialize(control, configuration, mapping = {}) ⇒ DatabaseDestination

Initialize the database destination

  • control: The ETL::Control::Control instance

  • configuration: The configuration Hash

  • mapping: The mapping

Configuration options:

  • :database: The database name (REQUIRED)

  • :target: The target connection (REQUIRED)

  • :table: The table to write to (REQUIRED)

  • :truncate: Set to true to truncate before writing (defaults to false)

  • :unique: Set to true to only insert unique records (defaults to false)

  • :append_rows: Array of rows to append

Mapping options:

  • :order: The order of fields to write (REQUIRED)

Raises:



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/etl/control/destination/database_destination.rb', line 36

def initialize(control, configuration, mapping={})
  super
  @target = configuration[:target]
  @table = configuration[:table]
  @truncate = configuration[:truncate] ||= false
  @unique = configuration[:unique]
  @order = mapping[:order] || order_from_source
  raise ControlError, "Order required in mapping" unless @order
  raise ControlError, "Table required" unless @table
  raise ControlError, "Target required" unless @target
end

Instance Attribute Details

#orderObject (readonly)

Specify the order from the source



15
16
17
# File 'lib/etl/control/destination/database_destination.rb', line 15

def order
  @order
end

#tableObject (readonly)

The table



12
13
14
# File 'lib/etl/control/destination/database_destination.rb', line 12

def table
  @table
end

#targetObject (readonly)

The target connection



9
10
11
# File 'lib/etl/control/destination/database_destination.rb', line 9

def target
  @target
end

#truncateObject (readonly)

Set to true to truncate the destination table first



18
19
20
# File 'lib/etl/control/destination/database_destination.rb', line 18

def truncate
  @truncate
end

Instance Method Details

#closeObject

Close the connection



78
79
80
81
# File 'lib/etl/control/destination/database_destination.rb', line 78

def close
  buffer << append_rows if append_rows
  flush
end

#flushObject

Flush the currently buffered data



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/etl/control/destination/database_destination.rb', line 49

def flush
  conn = ETL::Engine.connection(target)
  conn.transaction do
    conn.truncate(table_name) if truncate
    
    buffer.flatten.each do |row|
      # check to see if this row's compound key constraint already exists
      # note that the compound key constraint may not utilize virtual fields
      next unless row_allowed?(row)

      # add any virtual fields
      add_virtuals!(row)
      
      names = []
      values = []
      order.each do |name|
        names << name
        values << conn.quote(row[name]) # TODO: this is probably not database agnostic
      end
      q = "INSERT INTO #{table_name} (#{names.join(',')}) VALUES (#{values.join(',')})"
      ETL::Engine.logger.debug("Executing insert: #{q}")
      conn.insert(q, "Insert row #{current_row}")
      @current_row += 1
    end
    buffer.clear
  end
end