Class: DataTransport::DataStore::ActiveRecord
- Inherits:
-
DataTransport::DataStore
- Object
- DataTransport::DataStore
- DataTransport::DataStore::ActiveRecord
- Defined in:
- lib/data_transport/data_store/active_record.rb
Overview
Data store that reads and writes records in a database via ActiveRecord. This class is specifically optimized for reading and writing large numbers of records, providing a significant advantage over using ActiveRecord directly.
Instance Method Summary collapse
-
#count ⇒ Object
Returns the number of records in the table that match the data store’s conditions.
-
#each_record(batch_size = nil) ⇒ Object
:nodoc:.
-
#finalize ⇒ Object
:nodoc:.
-
#initialize(options = {}) ⇒ ActiveRecord
constructor
There are two ways to initialize this data store.
-
#klass ⇒ Object
:nodoc:.
-
#reset ⇒ Object
:nodoc:.
-
#write_record(record) ⇒ Object
:nodoc:.
Constructor Details
#initialize(options = {}) ⇒ ActiveRecord
There are two ways to initialize this data store. The first is by specifying one of your ActiveRecord models:
DataTransport::DataStore::ActiveRecord.new :class => MyModel
The second is by providing an ActiveRecord database specification (as read from database.yml, for example) and a table name:
db_spec = ActiveRecord::Base.configurations["other_app_#{RAILS_ENV}"]
DataTransport::DataStore::ActiveRecord.new(
:connection => db_spec,
:table_name => "sprockets"
)
The second form is useful for importing or exporting data in non-Rails applications.
In addition, the following options are accepted:
- conditions
-
Conditions describing which records to read. This can be anything that ActiveRecord will recognize, such as a hash table, an array with substitutions, or raw SQL. Default is nil (no conditions, read all records).
- truncate
-
If true, the table will be truncated before any records are written. On databases that support it, this is performed by executing a TRUNCATE TABLE query; all other databases use ActiveRecord’s delete_all method.
- ignore_errors
-
If true, errors that occur during record insertion will be ignored. This is useful if your table has a unique index and you want to silently drop records with duplicate keys. Currently this only works on MySQL. Default is false.
- max_sql_length
-
Maximum permissible length of an SQL query, in bytes. Rows to be inserted are buffered until the largest possible INSERT statement has been generated, at which point the statement is executed and a new INSERT statement begins. The default value varies depending on what type of database you’re connected to. With SQLite, the default is 1,000,000. With MySQL, the default is the value of the
max_allowed_packet
variable minus 512. With all other databases, the default is 16,777,216.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/data_transport/data_store/active_record.rb', line 51 def initialize( = {}) super() # Extract options. @class = .delete(:class) @connection = .delete(:connection) @table_name = .delete(:table_name) @conditions = .delete(:conditions) @truncate = .delete(:truncate) @ignore_errors = .delete(:ignore_errors) @max_sql_length = .delete(:max_sql_length) # Make sure a class or connection and table name was provided. if @class.nil? && (@connection.nil? || @table_name.nil?) raise(ArgumentError, "missing required option `class', or `connection' and `table_name'") end raise(TypeError, "class must be a class") if @class && !@class.is_a?(Class) # If connection specs were provided instead of a class, make an # anonymous ActiveRecord subclass. unless @class @class = Class.new(::ActiveRecord::Base) @class.set_table_name @table_name @class.establish_connection @connection end # Make sure the class descends from ActiveRecord::Base. klass = @class.superclass is_active_record = false while klass if klass == ::ActiveRecord::Base is_active_record = true break end klass = klass.superclass end raise(TypeError, "class must descend from ActiveRecord::Base") unless is_active_record # If ignore_errors is true, make sure we're connected to a MySQL # database. We don't use is_a? because if the MySQL adapter isn't # loaded, referencing its class throws a NameError. if @ignore_errors unless @class.connection.class.to_s == "ActiveRecord::ConnectionAdapters::MysqlAdapter" raise ArgumentError, "ignore_errors can only be used with a MySQL database" end end # Check for unknown options. unless .empty? raise(ArgumentError, "unrecognized options: `#{.join("', `")}'") end # Figure out how much data the database can handle in one query. See # the note above in the ignore_errors compatibility check about using # stringified class names. if @max_sql_length @max_sql_length = @max_sql_length.to_i else case @class.connection.class.to_s when "ActiveRecord::ConnectionAdapters::MysqlAdapter" rows = @class.connection.select_all("SHOW VARIABLES LIKE 'max_allowed_packet'") @max_sql_length = rows.first["Value"].to_i - 512 when /\AActiveRecord::ConnectionAdapters::SQLite3?Adapter\Z/ @max_sql_length = 1_000_000 else @max_sql_length = 16_777_216 end end # Fetch column information @columns = {} @class.columns.each {|c| @columns[c.name.to_sym] = c} end |
Instance Method Details
#count ⇒ Object
Returns the number of records in the table that match the data store’s conditions.
124 125 126 |
# File 'lib/data_transport/data_store/active_record.rb', line 124 def count @class.count(:conditions => @conditions) end |
#each_record(batch_size = nil) ⇒ Object
:nodoc:
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/data_transport/data_store/active_record.rb', line 128 def each_record(batch_size = nil) # :nodoc: conn = @class.connection column_names = conn.columns(@class.table_name).collect {|c| c.name} offset = 0 record = {} base_query = "SELECT * FROM #{conn.quote_table_name(@class.table_name)}" @class.send(:add_conditions!, base_query, @conditions) unless @conditions.nil? while true sql = base_query.dup conn.add_limit_offset!(sql, :limit => batch_size, :offset => offset) offset += batch_size rows = conn.select_rows(sql) break if rows.empty? rows.each do |row| record.clear column_names.each_with_index do |column_name, i| column_name = column_name.to_sym record[column_name] = @columns[column_name].type_cast(row[i]) end yield record end end end |
#finalize ⇒ Object
:nodoc:
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/data_transport/data_store/active_record.rb', line 181 def finalize # :nodoc: if @truncate conn = @class.connection begin conn.execute("TRUNCATE TABLE #{conn.quote_table_name(@class.table_name)}") rescue @class.delete_all end @truncate = false end if @sql_buffer && @sql_buffer[-1,1] == "," @sql_buffer.chop! @class.connection.execute(@sql_buffer) end end |
#klass ⇒ Object
:nodoc:
118 119 120 |
# File 'lib/data_transport/data_store/active_record.rb', line 118 def klass # :nodoc: @class end |
#reset ⇒ Object
:nodoc:
197 198 199 |
# File 'lib/data_transport/data_store/active_record.rb', line 197 def reset # :nodoc: @sql_buffer = nil end |
#write_record(record) ⇒ Object
:nodoc:
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/data_transport/data_store/active_record.rb', line 153 def write_record(record) # :nodoc: conn = @class.connection # If no SQL has been produced yet, start an INSERT statement. @sql_buffer ||= start_insert_sql(record) # Convert the record into a string of quoted values. values = [] record.each {|k, v| values << conn.quote(v, @columns[k])} values = "(#{values.join ","})," # Write the record. if @max_sql_length.nil? # We have no information on the database's maximum allowed packet # size, so it's safest to write the record immediately. @sql_buffer << values finalize elsif @sql_buffer.length + record.length > @max_sql_length # Appending this record to the SQL buffer will exceed the maximum # allowed packet size. Send the buffer to the database and start a # new statement with this record. finalize @sql_buffer = start_insert_sql @sql_buffer << values else # This record will not cause the SQL buffer to exceed the maximum # allowed packet size. Append it to the SQL buffer. @sql_buffer << values end end |