Class: RR::ReplicationRun
- Inherits:
-
Object
- Object
- RR::ReplicationRun
- Defined in:
- lib/rubyrep/replication_run.rb
Overview
Executes a single replication run
Instance Attribute Summary collapse
-
#session ⇒ Object
The current Session object.
-
#sweeper ⇒ Object
The current TaskSweeper.
Instance Method Summary collapse
-
#event_filtered?(diff) ⇒ Boolean
Calls the event filter for the give difference.
-
#helper ⇒ Object
Returns the current ReplicationHelper; creates it if necessary.
-
#initialize(session, sweeper) ⇒ ReplicationRun
constructor
Creates a new ReplicationRun instance.
-
#install_sweeper ⇒ Object
Installs the current sweeper into the database connections.
-
#replicator ⇒ Object
Returns the current replicator; creates it if necessary.
-
#run ⇒ Object
Executes the replication run.
Constructor Details
#initialize(session, sweeper) ⇒ ReplicationRun
Creates a new ReplicationRun instance.
-
session
: the current Session -
sweeper
: the current TaskSweeper
110 111 112 113 114 |
# File 'lib/rubyrep/replication_run.rb', line 110 def initialize(session, sweeper) self.session = session self.sweeper = sweeper install_sweeper end |
Instance Attribute Details
#session ⇒ Object
The current Session object
9 10 11 |
# File 'lib/rubyrep/replication_run.rb', line 9 def session @session end |
#sweeper ⇒ Object
The current TaskSweeper
12 13 14 |
# File 'lib/rubyrep/replication_run.rb', line 12 def sweeper @sweeper end |
Instance Method Details
#event_filtered?(diff) ⇒ Boolean
Calls the event filter for the give difference.
-
diff
: instance of ReplicationDifference
Returns true
if replication of the difference should not proceed.
28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/rubyrep/replication_run.rb', line 28 def event_filtered?(diff) event_filter = helper.(diff.changes[:left].table)[:event_filter] if event_filter && event_filter.respond_to?(:before_replicate) not event_filter.before_replicate( diff.changes[:left].table, helper.type_cast(diff.changes[:left].table, diff.changes[:left].key), helper, diff ) else false end end |
#helper ⇒ Object
Returns the current ReplicationHelper; creates it if necessary
15 16 17 |
# File 'lib/rubyrep/replication_run.rb', line 15 def helper @helper ||= ReplicationHelper.new(self) end |
#install_sweeper ⇒ Object
Installs the current sweeper into the database connections
98 99 100 101 102 103 104 105 |
# File 'lib/rubyrep/replication_run.rb', line 98 def install_sweeper [:left, :right].each do |database| unless session.send(database).respond_to?(:sweeper) session.send(database).send(:extend, NoisyConnection) end session.send(database).sweeper = sweeper end end |
#replicator ⇒ Object
Returns the current replicator; creates it if necessary.
20 21 22 23 |
# File 'lib/rubyrep/replication_run.rb', line 20 def replicator @replicator ||= Replicators.replicators[session.configuration.[:replicator]].new(helper) end |
#run ⇒ Object
Executes the replication run.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/rubyrep/replication_run.rb', line 43 def run return unless [:left, :right].any? do |database| changes_pending = false t = Thread.new do changes_pending = session.send(database).select_one( "select id from #{session.configuration.[:rep_prefix]}_pending_changes limit 1" ) != nil end t.join session.configuration.[:database_connection_timeout] changes_pending end # Apparently sometimes above check for changes takes already so long, that # the replication run times out. # Check for this and if timed out, return (silently). return if sweeper.terminated? loaders = LoggedChangeLoaders.new(session) success = false begin replicator # ensure that replicator is created and has chance to validate settings loop do begin loaders.update # ensure the cache of change log records is up-to-date diff = ReplicationDifference.new loaders diff.load break unless diff.loaded? break if sweeper.terminated? if diff.type != :no_diff and not event_filtered?(diff) replicator.replicate_difference diff end rescue Exception => e begin helper.log_replication_outcome diff, e., e.class.to_s + "\n" + e.backtrace.join("\n") rescue Exception => _ # if logging to database itself fails, re-raise the original exception raise e end end end success = true ensure if sweeper.terminated? helper.finalize false session.disconnect_databases else helper.finalize success end end end |