Class: DataTransport::DataStore::ActiveRecord

Inherits:
DataTransport::DataStore show all
Defined in:
lib/data_transport/data_store/active_record.rb

Overview

Data store that reads and writes records in a database via ActiveRecord. This class is specifically optimized for reading and writing large numbers of records, providing a significant advantage over using ActiveRecord directly.

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ActiveRecord

There are two ways to initialize this data store. The first is by specifying one of your ActiveRecord models:

DataTransport::DataStore::ActiveRecord.new :class => MyModel

The second is by providing an ActiveRecord database specification (as read from database.yml, for example) and a table name:

db_spec = ActiveRecord::Base.configurations["other_app_#{RAILS_ENV}"]
DataTransport::DataStore::ActiveRecord.new(
  :connection => db_spec,
  :table_name => "sprockets"
)

The second form is useful for importing or exporting data in non-Rails applications.

In addition, the following options are accepted:

conditions

Conditions describing which records to read. This can be anything that ActiveRecord will recognize, such as a hash table, an array with substitutions, or raw SQL. Default is nil (no conditions, read all records).

truncate

If true, the table will be truncated before any records are written. On databases that support it, this is performed by executing a TRUNCATE TABLE query; all other databases use ActiveRecord’s delete_all method.

ignore_errors

If true, errors that occur during record insertion will be ignored. This is useful if your table has a unique index and you want to silently drop records with duplicate keys. Currently this only works on MySQL. Default is false.

max_sql_length

Maximum permissible length of an SQL query, in bytes. Rows to be inserted are buffered until the largest possible INSERT statement has been generated, at which point the statement is executed and a new INSERT statement begins. The default value varies depending on what type of database you’re connected to. With SQLite, the default is 1,000,000. With MySQL, the default is the value of the max_allowed_packet variable minus 512. With all other databases, the default is 16,777,216.

Raises:

  • (TypeError)


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/data_transport/data_store/active_record.rb', line 51

def initialize(options = {})
  super()
  # Extract options.
  @class          = options.delete(:class)
  @connection     = options.delete(:connection)
  @table_name     = options.delete(:table_name)
  @conditions     = options.delete(:conditions)
  @truncate       = options.delete(:truncate)
  @ignore_errors  = options.delete(:ignore_errors)
  @max_sql_length = options.delete(:max_sql_length)
  # Make sure a class or connection and table name was provided.
  if @class.nil? && (@connection.nil? || @table_name.nil?)
    raise(ArgumentError, "missing required option `class', or `connection' and `table_name'")
  end
  raise(TypeError, "class must be a class") if @class && !@class.is_a?(Class)
  # If connection specs were provided instead of a class, make an
  # anonymous ActiveRecord subclass.
  unless @class
    @class = Class.new(::ActiveRecord::Base)
    @class.set_table_name @table_name
    @class.establish_connection @connection
  end
  # Make sure the class descends from ActiveRecord::Base.
  klass = @class.superclass
  is_active_record = false
  while klass
    if klass == ::ActiveRecord::Base
      is_active_record = true
      break
    end
    klass = klass.superclass
  end
  raise(TypeError, "class must descend from ActiveRecord::Base") unless is_active_record
  # If ignore_errors is true, make sure we're connected to a MySQL
  # database. We don't use is_a? because if the MySQL adapter isn't
  # loaded, referencing its class throws a NameError.
  if @ignore_errors
    unless @class.connection.class.to_s ==
      "ActiveRecord::ConnectionAdapters::MysqlAdapter"
      raise ArgumentError, "ignore_errors can only be used with a MySQL database"
    end
  end
  # Check for unknown options.
  unless options.empty?
    raise(ArgumentError, "unrecognized options: `#{options.join("', `")}'")
  end
  # Figure out how much data the database can handle in one query. See
  # the note above in the ignore_errors compatibility check about using
  # stringified class names.
  if @max_sql_length
    @max_sql_length = @max_sql_length.to_i
  else
    case @class.connection.class.to_s
    when "ActiveRecord::ConnectionAdapters::MysqlAdapter"
      rows = @class.connection.select_all("SHOW VARIABLES LIKE 'max_allowed_packet'")
      @max_sql_length = rows.first["Value"].to_i - 512
    when /\AActiveRecord::ConnectionAdapters::SQLite3?Adapter\Z/
      @max_sql_length = 1_000_000
    else
      @max_sql_length = 16_777_216
    end
  end
  # Fetch column information
  @columns = {}
  @class.columns.each {|c| @columns[c.name.to_sym] = c}
end

Instance Method Details

#countObject

Returns the number of records in the table that match the data store’s conditions.



124
125
126
# File 'lib/data_transport/data_store/active_record.rb', line 124

def count
  @class.count(:conditions => @conditions)
end

#each_record(batch_size = nil) ⇒ Object

:nodoc:



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/data_transport/data_store/active_record.rb', line 128

def each_record(batch_size = nil) # :nodoc:
  conn = @class.connection
  column_names = conn.columns(@class.table_name).collect {|c| c.name}
  
  offset = 0
  record = {}
  base_query = "SELECT * FROM #{conn.quote_table_name(@class.table_name)}"
  @class.send(:add_conditions!, base_query, @conditions) unless @conditions.nil?
  while true
    sql = base_query.dup
    conn.add_limit_offset!(sql, :limit => batch_size, :offset => offset)
    offset += batch_size
    rows = conn.select_rows(sql)
    break if rows.empty?
    rows.each do |row|
      record.clear
      column_names.each_with_index do |column_name, i|
        column_name = column_name.to_sym
        record[column_name] = @columns[column_name].type_cast(row[i])
      end
      yield record
    end
  end
end

#finalizeObject

:nodoc:



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/data_transport/data_store/active_record.rb', line 181

def finalize # :nodoc:
  if @truncate
    conn = @class.connection
    begin
      conn.execute("TRUNCATE TABLE #{conn.quote_table_name(@class.table_name)}")
    rescue
      @class.delete_all
    end
    @truncate = false
  end
  if @sql_buffer && @sql_buffer[-1,1] == ","
    @sql_buffer.chop!
    @class.connection.execute(@sql_buffer)
  end
end

#klassObject

:nodoc:



118
119
120
# File 'lib/data_transport/data_store/active_record.rb', line 118

def klass # :nodoc:
  @class
end

#resetObject

:nodoc:



197
198
199
# File 'lib/data_transport/data_store/active_record.rb', line 197

def reset # :nodoc:
  @sql_buffer = nil
end

#write_record(record) ⇒ Object

:nodoc:



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/data_transport/data_store/active_record.rb', line 153

def write_record(record) # :nodoc:
  conn = @class.connection
  # If no SQL has been produced yet, start an INSERT statement.
  @sql_buffer ||= start_insert_sql(record)
  # Convert the record into a string of quoted values.
  values = []
  record.each {|k, v| values << conn.quote(v, @columns[k])}
  values = "(#{values.join ","}),"
  # Write the record.
  if @max_sql_length.nil?
    # We have no information on the database's maximum allowed packet
    # size, so it's safest to write the record immediately.
    @sql_buffer << values
    finalize
  elsif @sql_buffer.length + record.length > @max_sql_length
    # Appending this record to the SQL buffer will exceed the maximum
    # allowed packet size. Send the buffer to the database and start a
    # new statement with this record.
    finalize
    @sql_buffer = start_insert_sql
    @sql_buffer << values
  else
    # This record will not cause the SQL buffer to exceed the maximum
    # allowed packet size. Append it to the SQL buffer.
    @sql_buffer << values
  end
end