Class: RR::Replicators::TwoWayReplicator
- Inherits:
-
Object
- Object
- RR::Replicators::TwoWayReplicator
- Defined in:
- lib/rubyrep/replicators/two_way_replicator.rb
Overview
This replicator implements a two way replication. Options:
-
:
left_change_handling, :right_change_handling: Handling of records that were changed only in the named database. Can be any of the following:-
:
ignore: No action. -
:
replicate: Updates other database accordingly. Default Setting -
Procobject: If a Proc object is given, it is responsible for dealing with the record. Called with the following parameters:-
replication_helper: The current ReplicationHelper instance.
-
difference: A ReplicationDifference instance describing the change
-
-
-
:
replication_conflict_handling: Handling of conflicting record changes. Can be any of the following:-
:
ignore: No action. Default Setting -
:
left_wins: The right database is updated accordingly. -
:
right_wins: The left database is updated accordingly. -
:
later_wins: The more recent change is replicated. (If both changes have same age: left change is replicated) -
:
earlier_wins: The less recent change is replicated. (If both records have same age: left change is replicated) -
Procobject: If a Proc object is given, it is responsible for dealing with the record. Called with the following parameters:-
replication_helper: The current ReplicationHelper instance.
-
difference: A ReplicationDifference instance describing the changes
-
-
-
:
logged_replication_events: Specifies which types of replications are logged. Is either a single value or an array of multiple ones. Default: [:ignored_conflicts] Possible values:-
:
ignored_changes: log ignored (but not replicated) non-conflict changes -
:
all_changes: log all non-conflict changes -
:
ignored_conflicts: log ignored (but not replicated) conflicts -
:
all_conflicts: log all conflicts
-
Example of using a Proc object for custom behaviour:
lambda do |rep_helper, diff|
# if specified as replication_conflict_handling option, logs all
# conflicts to a text file
File.open('/var/log/rubyrep_conflict_log', 'a') do |f|
f.puts <<-end_str
#{Time.now}: conflict
* in table #{diff.changes[:left].table}
* for record '#{diff.changes[:left].key}'
* change type in left db: '#{diff.changes[:left].type}'
* change type in right db: '#{diff.changes[:right].type}'
end_str
end
end
Constant Summary collapse
- DEFAULT_OPTIONS =
Default TwoWayReplicator options.
{ :left_change_handling => :replicate, :right_change_handling => :replicate, :replication_conflict_handling => :ignore, :logged_replication_events => [:ignored_conflicts], }
- OTHER_SIDE =
Shortcut to calculate the “other” database.
{ :left => :right, :right => :left }
- CONFLICT_STATE_MATRIX =
Specifies how to clear conflicts. The outer hash keys describe the type of the winning change. The inner hash keys describe the type of the loosing change. The inser hash values describe the action to take on the loosing side.
{ :insert => {:insert => :update, :update => :update, :delete => :insert}, :update => {:insert => :update, :update => :update, :delete => :insert}, :delete => {:insert => :delete, :update => :delete, :delete => :delete} }
- MAX_REPLICATION_ATTEMPTS =
How often a replication will be attempted (in case it fails because the record in question was removed from the source or inserted into the target database after the ReplicationDifference was loaded
2
Instance Attribute Summary collapse
-
#rep_helper ⇒ Object
The current ReplicationHelper object.
Instance Method Summary collapse
-
#attempt_delete(source_db, diff, target_key) ⇒ Object
Attempts to delete the source record from the target database.
-
#attempt_insert(source_db, diff, remaining_attempts, source_key) ⇒ Object
Attempts to read the specified record from the source database and insert it into the target database.
-
#attempt_update(source_db, diff, remaining_attempts, source_key, target_key) ⇒ Object
Attempts to read the specified record from the source database and update the specified record in the target database.
-
#clear_conflict(source_db, diff, remaining_attempts) ⇒ Object
Helper function that clears a conflict by taking the change from the specified winning database and updating the other database accordingly.
-
#initialize(rep_helper) ⇒ TwoWayReplicator
constructor
Initializes the TwoWayReplicator Raises an ArgumentError if any of the replication options is invalid.
-
#log_replication_outcome(winner, diff) ⇒ Object
Logs replication of the specified difference as per configured :
replication_conflict_logging/ :left_change_logging/ :right_change_loggingoptions. -
#options_for_table(table) ⇒ Object
Returns the options for the specified table name.
-
#replicate_difference(diff, remaining_attempts = MAX_REPLICATION_ATTEMPTS, previous_failure_description = nil) ⇒ Object
Called to replicate the specified difference.
-
#validate_change_handling_options ⇒ Object
Verifies if the :
left_change_handling/ :right_change_handlingoptions are valid. -
#validate_conflict_handling_options ⇒ Object
Verifies if the given :
replication_conflict_handlingoptions are valid. -
#validate_logging_options ⇒ Object
Verifies if the given :
replication_loggingoption /options is / are valid. -
#verify_option(table_spec, valid_option_values, option_key, option_value) ⇒ Object
Checks if an option is configured correctly.
Constructor Details
#initialize(rep_helper) ⇒ TwoWayReplicator
Initializes the TwoWayReplicator Raises an ArgumentError if any of the replication options is invalid.
Parameters:
-
rep_helper: The ReplicationHelper object providing information and utility functions.
129 130 131 132 133 134 135 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 129 def initialize(rep_helper) self.rep_helper = rep_helper end |
Instance Attribute Details
#rep_helper ⇒ Object
The current ReplicationHelper object
61 62 63 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 61 def rep_helper @rep_helper end |
Instance Method Details
#attempt_delete(source_db, diff, target_key) ⇒ Object
Attempts to delete the source record from the target database.
-
if +source_db is :
left, then the record is deleted in database
-
:right.
-
source_db: either :leftor :right- source database of replication -
diff: the current ReplicationDifference instance -
target_key: a column_name => value hash identifying the source record
277 278 279 280 281 282 283 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 277 def attempt_delete(source_db, diff, target_key) change = diff.changes[source_db] target_db = OTHER_SIDE[source_db] target_table = rep_helper.corresponding_table(source_db, change.table) log_replication_outcome source_db, diff rep_helper.delete_record target_db, target_table, target_key end |
#attempt_insert(source_db, diff, remaining_attempts, source_key) ⇒ Object
Attempts to read the specified record from the source database and insert it into the target database. Retries if insert fails due to missing source or suddenly existing target record.
-
source_db: either :leftor :right- source database of replication -
diff: the current ReplicationDifference instance -
remaining_attempts: the number of remaining replication attempts for this difference -
source_key: a column_name => value hash identifying the source record
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 219 def attempt_insert(source_db, diff, remaining_attempts, source_key) source_change = diff.changes[source_db] source_table = source_change.table target_db = OTHER_SIDE[source_db] target_table = rep_helper.corresponding_table(source_db, source_table) values = rep_helper.load_record source_db, source_table, source_key if values == nil diff.amend replicate_difference diff, remaining_attempts - 1, "source record for insert vanished" else begin # note: savepoints have to be used for postgresql (as a failed SQL # statement will otherwise invalidate the complete transaction.) rep_helper.session.send(target_db).execute "savepoint rr_insert" log_replication_outcome source_db, diff rep_helper.insert_record target_db, target_table, values rescue Exception => e rep_helper.session.send(target_db).execute "rollback to savepoint rr_insert" row = rep_helper.load_record target_db, target_table, source_key raise unless row # problem is not the existence of the record in the target db diff.amend replicate_difference diff, remaining_attempts - 1, "insert failed with #{e.}" end end end |
#attempt_update(source_db, diff, remaining_attempts, source_key, target_key) ⇒ Object
Attempts to read the specified record from the source database and update the specified record in the target database. Retries if update fails due to missing source
-
source_db: either :leftor :right- source database of replication -
diff: the current ReplicationDifference instance -
remaining_attempts: the number of remaining replication attempts for this difference -
source_key: a column_name => value hash identifying the source record -
target_key: a column_name => value hash identifying the source record
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 255 def attempt_update(source_db, diff, remaining_attempts, source_key, target_key) source_change = diff.changes[source_db] source_table = source_change.table target_db = OTHER_SIDE[source_db] target_table = rep_helper.corresponding_table(source_db, source_table) values = rep_helper.load_record source_db, source_table, source_key if values == nil diff.amend replicate_difference diff, remaining_attempts - 1, "source record for update vanished" else log_replication_outcome source_db, diff rep_helper.update_record target_db, target_table, values, target_key end end |
#clear_conflict(source_db, diff, remaining_attempts) ⇒ Object
Helper function that clears a conflict by taking the change from the specified winning database and updating the other database accordingly.
-
source_db: the winning database (either :leftor :right) -
diff: the ReplicationDifference instance -
remaining_attempts: the number of remaining replication attempts for this difference
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 158 def clear_conflict(source_db, diff, remaining_attempts) source_change = diff.changes[source_db] target_db = OTHER_SIDE[source_db] target_change = diff.changes[target_db] target_action = CONFLICT_STATE_MATRIX[source_change.type][target_change.type] source_key = source_change.type == :update ? source_change.new_key : source_change.key target_key = target_change.type == :update ? target_change.new_key : target_change.key case target_action when :insert attempt_insert source_db, diff, remaining_attempts, source_key when :update attempt_update source_db, diff, remaining_attempts, source_key, target_key when :delete attempt_delete source_db, diff, target_key end end |
#log_replication_outcome(winner, diff) ⇒ Object
Logs replication of the specified difference as per configured :replication_conflict_logging / :left_change_logging / :right_change_logging options.
-
winner: Either the winner database (:leftor :right) or :ignore -
diff: the ReplicationDifference instance
191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 191 def log_replication_outcome(winner, diff) = (diff.changes[:left].table) option_values = [[:logged_replication_events]].flatten # make sure I have an array if diff.type == :conflict return unless option_values.include?(:all_conflicts) or option_values.include?(:ignored_conflicts) return if winner != :ignore and not option_values.include?(:all_conflicts) outcome = {:left => 'left_won', :right => 'right_won', :ignore => 'ignored'}[winner] else return unless option_values.include?(:all_changes) or option_values.include?(:ignored_changes) return if winner != :ignore and not option_values.include?(:all_changes) outcome = winner == :ignore ? 'ignored' : 'replicated' end rep_helper.log_replication_outcome diff, outcome end |
#options_for_table(table) ⇒ Object
Returns the options for the specified table name.
-
table: name of the table (left database version)
178 179 180 181 182 183 184 185 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 178 def (table) @options_for_table ||= {} unless @options_for_table.include? table @options_for_table[table] = DEFAULT_OPTIONS.merge( rep_helper.session.configuration.(table)) end @options_for_table[table] end |
#replicate_difference(diff, remaining_attempts = MAX_REPLICATION_ATTEMPTS, previous_failure_description = nil) ⇒ Object
Called to replicate the specified difference.
-
:
diff: ReplicationDifference instance -
:
remaining_attempts: how many more times a replication will be attempted -
:
previous_failure_description: why the previous replication attempt failed
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 289 def replicate_difference(diff, remaining_attempts = MAX_REPLICATION_ATTEMPTS, previous_failure_description = nil) raise Exception, previous_failure_description || "max replication attempts exceeded" if remaining_attempts == 0 = (diff.changes[:left].table) if diff.type == :left or diff.type == :right key = diff.type == :left ? :left_change_handling : :right_change_handling option = [key] if option == :ignore log_replication_outcome :ignore, diff elsif option == :replicate source_db = diff.type change = diff.changes[source_db] case change.type when :insert attempt_insert source_db, diff, remaining_attempts, change.key when :update attempt_update source_db, diff, remaining_attempts, change.new_key, change.key when :delete attempt_delete source_db, diff, change.key end else # option must be a Proc option.call rep_helper, diff end elsif diff.type == :conflict option = [:replication_conflict_handling] if option == :ignore log_replication_outcome :ignore, diff elsif option == :left_wins clear_conflict :left, diff, remaining_attempts elsif option == :right_wins clear_conflict :right, diff, remaining_attempts elsif option == :later_wins winner_db = diff.changes[:left].last_changed_at >= diff.changes[:right].last_changed_at ? :left : :right clear_conflict winner_db, diff, remaining_attempts elsif option == :earlier_wins winner_db = diff.changes[:left].last_changed_at <= diff.changes[:right].last_changed_at ? :left : :right clear_conflict winner_db, diff, remaining_attempts else # option must be a Proc option.call rep_helper, diff end end end |
#validate_change_handling_options ⇒ Object
Verifies if the :left_change_handling / :right_change_handling options are valid. Raises an ArgumentError if an option is invalid
88 89 90 91 92 93 94 95 96 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 88 def [:left_change_handling, :right_change_handling].each do |key| rep_helper.session.configuration.each_matching_option(key) do |table_spec, value| unless value.respond_to? :call verify_option table_spec, [:ignore, :replicate], key, value end end end end |
#validate_conflict_handling_options ⇒ Object
Verifies if the given :replication_conflict_handling options are valid. Raises an ArgumentError if an option is invalid.
100 101 102 103 104 105 106 107 108 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 100 def rep_helper.session.configuration.each_matching_option(:replication_conflict_handling) do |table_spec, value| unless value.respond_to? :call verify_option table_spec, [:ignore, :left_wins, :right_wins, :later_wins, :earlier_wins], :replication_conflict_handling, value end end end |
#validate_logging_options ⇒ Object
Verifies if the given :replication_logging option /options is / are valid. Raises an ArgumentError if invalid
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 112 def rep_helper.session.configuration.each_matching_option(:logged_replication_events) do |table_spec, values| values = [values].flatten # ensure that I have an array values.each do |value| verify_option table_spec, [:ignored_changes, :all_changes, :ignored_conflicts, :all_conflicts], :logged_replication_events, value end end end |
#verify_option(table_spec, valid_option_values, option_key, option_value) ⇒ Object
Checks if an option is configured correctly. Raises an ArgumentError if not.
-
table_spec: the table specification to which the option belongs. May benil. -
valid_option_values: array of valid option values -
option_key: the key of the option that is to be checked -
option_value: the value of the option that is to be checked
76 77 78 79 80 81 82 83 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 76 def verify_option(table_spec, valid_option_values, option_key, option_value) unless valid_option_values.include? option_value = "" << "#{table_spec.inspect}: " if table_spec << "#{option_value.inspect} not a valid #{option_key.inspect} option" raise ArgumentError.new() end end |