Module: RR::ReplicationExtenders::MysqlReplication
- Defined in:
- lib/rubyrep/replication_extenders/mysql_replication.rb
Overview
Provides Mysql specific functionality for database replication
Instance Method Summary collapse
-
#add_big_primary_key(table_name, key_name) ⇒ Object
Adds a big (8 byte value), auto-incrementing primary key column to the specified table.
-
#clear_sequence_setup(rep_prefix, table_name) ⇒ Object
Removes the custom sequence setup for the specified table.
-
#create_or_replace_replication_trigger_function(params) ⇒ Object
Creates or replaces the replication trigger function.
-
#create_replication_trigger(params) ⇒ Object
Creates a trigger to log all changes for the given table.
-
#drop_replication_trigger(trigger_name, table_name) ⇒ Object
Removes a trigger and related trigger procedure.
-
#replication_trigger_exists?(trigger_name, table_name) ⇒ Boolean
Returns
true
if the named trigger exists for the named table. -
#sequence_values(rep_prefix, table_name) ⇒ Object
Returns all unadjusted sequences of the given table.
-
#update_sequences(rep_prefix, table_name, increment, offset, left_sequence_values, right_sequence_values, adjustment_buffer) ⇒ Object
Ensures that the sequences of the named table (normally the primary key column) are generated with the correct increment and offset.
Instance Method Details
#add_big_primary_key(table_name, key_name) ⇒ Object
Adds a big (8 byte value), auto-incrementing primary key column to the specified table.
-
table_name: name of the target table
-
key_name: name of the primary key column
239 240 241 242 243 |
# File 'lib/rubyrep/replication_extenders/mysql_replication.rb', line 239 def add_big_primary_key(table_name, key_name) execute(<<-end_sql) alter table #{table_name} add column #{key_name} bigint not null auto_increment primary key end_sql end |
#clear_sequence_setup(rep_prefix, table_name) ⇒ Object
Removes the custom sequence setup for the specified table. If no more rubyrep sequences are left, removes the sequence table.
-
rep_prefix
: not used (necessary) for the Postgres -
table_name
: name of the table
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/rubyrep/replication_extenders/mysql_replication.rb', line 249 def clear_sequence_setup(rep_prefix, table_name) sequence_table_name = "#{rep_prefix}_sequences" if tables.include?(sequence_table_name) trigger_name = "#{rep_prefix}_#{table_name}_sequence" trigger_row = select_one(<<-end_sql) select * from information_schema.triggers where trigger_schema = database() and trigger_name = '#{trigger_name}' end_sql if trigger_row execute "DROP TRIGGER `#{trigger_name}`" execute "delete from #{sequence_table_name} where name = '#{table_name}'" unless select_one("select * from #{sequence_table_name}") # no more sequences left --> delete sequence table drop_table sequence_table_name.to_sym end end end end |
#create_or_replace_replication_trigger_function(params) ⇒ Object
Creates or replaces the replication trigger function. See #create_replication_trigger for a descriptions of the params
hash.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/rubyrep/replication_extenders/mysql_replication.rb', line 10 def create_or_replace_replication_trigger_function(params) execute(<<-end_sql) DROP PROCEDURE IF EXISTS `#{params[:trigger_name]}`; end_sql activity_check = "" if params[:exclude_rr_activity] then activity_check = <<-end_sql DECLARE active INT; SELECT count(*) INTO active FROM #{params[:activity_table]}; IF active <> 0 THEN LEAVE p; END IF; end_sql end execute(<<-end_sql) CREATE PROCEDURE `#{params[:trigger_name]}`(change_key varchar(2000), change_new_key varchar(2000), change_type varchar(1)) p: BEGIN #{activity_check} INSERT INTO #{params[:log_table]}(change_table, change_key, change_new_key, change_type, change_time) VALUES('#{params[:table]}', change_key, change_new_key, change_type, now()); END; end_sql end |
#create_replication_trigger(params) ⇒ Object
Creates a trigger to log all changes for the given table. params
is a hash with all necessary information:
-
:
trigger_name
: name of the trigger -
:
table
: name of the table that should be monitored -
:
keys
: array of names of the key columns of the monitored table -
:
log_table
: name of the table receiving all change notifications -
:
activity_table
: name of the table receiving the rubyrep activity information -
:
key_sep
: column seperator to be used in the key column of the log table -
:
exclude_rr_activity
: if true, the trigger will check and filter out changes initiated by RubyRep
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/rubyrep/replication_extenders/mysql_replication.rb', line 57 def create_replication_trigger(params) create_or_replace_replication_trigger_function params %w(insert update delete).each do |action| execute(<<-end_sql) DROP TRIGGER IF EXISTS `#{params[:trigger_name]}_#{action}`; end_sql # The created triggers can handle the case where the trigger procedure # is updated (that is: temporarily deleted and recreated) while the # trigger is running. # For that an MySQL internal exception is raised if the trigger # procedure cannot be found. The exception is caught by an trigger # internal handler. # The handler causes the trigger to retry calling the # trigger procedure several times with short breaks in between. trigger_var = action == 'delete' ? 'OLD' : 'NEW' if action == 'update' call_statement = "CALL `#{params[:trigger_name]}`(#{key_clause('OLD', params)}, #{key_clause('NEW', params)}, '#{action[0,1].upcase}');" else call_statement = "CALL `#{params[:trigger_name]}`(#{key_clause(trigger_var, params)}, null, '#{action[0,1].upcase}');" end execute(<<-end_sql) CREATE TRIGGER `#{params[:trigger_name]}_#{action}` AFTER #{action} ON `#{params[:table]}` FOR EACH ROW BEGIN DECLARE number_attempts INT DEFAULT 0; DECLARE failed INT; DECLARE CONTINUE HANDLER FOR 1305 BEGIN DO SLEEP(0.05); SET failed = 1; SET number_attempts = number_attempts + 1; END; REPEAT SET failed = 0; #{call_statement} UNTIL failed = 0 OR number_attempts >= 40 END REPEAT; END; end_sql end end |
#drop_replication_trigger(trigger_name, table_name) ⇒ Object
Removes a trigger and related trigger procedure.
-
trigger_name
: name of the trigger -
table_name
: name of the table for which the trigger exists
103 104 105 106 107 108 |
# File 'lib/rubyrep/replication_extenders/mysql_replication.rb', line 103 def drop_replication_trigger(trigger_name, table_name) %w(insert update delete).each do |action| execute "DROP TRIGGER `#{trigger_name}_#{action}`;" end execute "DROP PROCEDURE `#{trigger_name}`;" end |
#replication_trigger_exists?(trigger_name, table_name) ⇒ Boolean
Returns true
if the named trigger exists for the named table.
-
trigger_name
: name of the trigger -
table_name
: name of the table
113 114 115 |
# File 'lib/rubyrep/replication_extenders/mysql_replication.rb', line 113 def replication_trigger_exists?(trigger_name, table_name) !select_all("select 1 from information_schema.triggers where trigger_schema = database() and trigger_name = '#{trigger_name}_insert' and event_object_table = '#{table_name}'").empty? end |
#sequence_values(rep_prefix, table_name) ⇒ Object
Returns all unadjusted sequences of the given table. Parameters:
-
rep_prefix
: The prefix put in front of all replication related database objects as specified via Configuration#options. Is used to create the sequences table. -
table_name
: name of the table
Return value: a hash with
-
key: sequence name
-
value: a hash with
-
:
increment
: current sequence increment -
:
value
: current value
-
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/rubyrep/replication_extenders/mysql_replication.rb', line 129 def sequence_values(rep_prefix, table_name) # check if the table has an auto_increment column, return if not sequence_row = select_one(<<-end_sql) show columns from `#{table_name}` where extra = 'auto_increment' end_sql return {} unless sequence_row column_name = sequence_row['Field'] # check if the sequences table exists, create if necessary sequence_table_name = "#{rep_prefix}_sequences" unless tables.include?(sequence_table_name) create_table "#{sequence_table_name}".to_sym, :id => false, :options => 'ENGINE=MyISAM' do |t| t.column :name, :string t.column :current_value, :integer t.column :increment, :integer t.column :offset, :integer end ActiveRecord::Base.connection.execute(<<-end_sql) rescue nil ALTER TABLE "#{sequence_table_name}" ADD CONSTRAINT #{sequence_table_name}_pkey PRIMARY KEY (name) end_sql end sequence_row = select_one("select current_value, increment, offset from #{sequence_table_name} where name = '#{table_name}'") if sequence_row == nil current_max = select_one(<<-end_sql)['current_max'].to_i select max(`#{column_name}`) as current_max from `#{table_name}` end_sql return {column_name => { :increment => 1, :value => current_max } } else return {column_name => { :increment => sequence_row['increment'].to_i, :value => sequence_row['offset'].to_i } } end end |
#update_sequences(rep_prefix, table_name, increment, offset, left_sequence_values, right_sequence_values, adjustment_buffer) ⇒ Object
Ensures that the sequences of the named table (normally the primary key column) are generated with the correct increment and offset.
-
rep_prefix
: not used (necessary) for the Postgres -
table_name
: name of the table (not used for Postgres) -
increment
: increment of the sequence -
offset
: offset -
left_sequence_values
:hash as returned by #sequence_values for the left database
-
right_sequence_values
:hash as returned by #sequence_values for the right database
-
adjustment_buffer
:the "gap" that is created during sequence update to avoid concurrency problems
-
an increment of 2 and offset of 1 will lead to generation of odd
-
numbers.
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/rubyrep/replication_extenders/mysql_replication.rb', line 187 def update_sequences( rep_prefix, table_name, increment, offset, left_sequence_values, right_sequence_values, adjustment_buffer) return if left_sequence_values.empty? column_name = left_sequence_values.keys[0] # check if the sequences table exists, create if necessary sequence_table_name = "#{rep_prefix}_sequences" current_max = [left_sequence_values[column_name][:value], right_sequence_values[column_name][:value]].max + adjustment_buffer new_start = current_max - (current_max % increment) + increment + offset sequence_row = select_one("select current_value, increment, offset from #{sequence_table_name} where name = '#{table_name}'") if sequence_row == nil # no sequence exists yet for the table, create it and the according # sequence trigger execute(<<-end_sql) insert into #{sequence_table_name}(name, current_value, increment, offset) values('#{table_name}', #{new_start}, #{increment}, #{offset}) end_sql trigger_name = "#{rep_prefix}_#{table_name}_sequence" execute(<<-end_sql) DROP TRIGGER IF EXISTS `#{trigger_name}`; end_sql execute(<<-end_sql) CREATE TRIGGER `#{trigger_name}` BEFORE INSERT ON `#{table_name}` FOR EACH ROW BEGIN IF NEW.`#{column_name}` = 0 THEN UPDATE #{sequence_table_name} SET current_value = LAST_INSERT_ID(current_value + increment) WHERE name = '#{table_name}'; SET NEW.`#{column_name}` = LAST_INSERT_ID(); END IF; END; end_sql elsif sequence_row['increment'].to_i != increment or sequence_row['offset'].to_i != offset # sequence exists but with incorrect values; update it execute(<<-end_sql) update #{sequence_table_name} set current_value = #{new_start}, increment = #{increment}, offset = #{offset} where name = '#{table_name}' end_sql end end |