Class: RR::ReplicationRun

Inherits:
Object
  • Object
show all
Defined in:
lib/rubyrep/replication_run.rb

Overview

Executes a single replication run

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, sweeper) ⇒ ReplicationRun

Creates a new ReplicationRun instance.

  • session: the current Session

  • sweeper: the current TaskSweeper



110
111
112
113
114
# File 'lib/rubyrep/replication_run.rb', line 110

def initialize(session, sweeper)
  self.session = session
  self.sweeper = sweeper
  install_sweeper
end

Instance Attribute Details

#sessionObject

The current Session object



9
10
11
# File 'lib/rubyrep/replication_run.rb', line 9

def session
  @session
end

#sweeperObject

The current TaskSweeper



12
13
14
# File 'lib/rubyrep/replication_run.rb', line 12

def sweeper
  @sweeper
end

Instance Method Details

#event_filtered?(diff) ⇒ Boolean

Calls the event filter for the give difference.

  • diff: instance of ReplicationDifference

Returns true if replication of the difference should not proceed.

Returns:

  • (Boolean)


28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/rubyrep/replication_run.rb', line 28

def event_filtered?(diff)
  event_filter = helper.options_for_table(diff.changes[:left].table)[:event_filter]
  if event_filter && event_filter.respond_to?(:before_replicate)
    not event_filter.before_replicate(
      diff.changes[:left].table,
      helper.type_cast(diff.changes[:left].table, diff.changes[:left].key),
      helper, 
      diff
    )
  else
    false
  end
end

#helperObject

Returns the current ReplicationHelper; creates it if necessary



15
16
17
# File 'lib/rubyrep/replication_run.rb', line 15

def helper
  @helper ||= ReplicationHelper.new(self)
end

#install_sweeperObject

Installs the current sweeper into the database connections



98
99
100
101
102
103
104
105
# File 'lib/rubyrep/replication_run.rb', line 98

def install_sweeper
  [:left, :right].each do |database|
    unless session.send(database).respond_to?(:sweeper)
      session.send(database).send(:extend, NoisyConnection)
    end
    session.send(database).sweeper = sweeper
  end
end

#replicatorObject

Returns the current replicator; creates it if necessary.



20
21
22
23
# File 'lib/rubyrep/replication_run.rb', line 20

def replicator
  @replicator ||=
    Replicators.replicators[session.configuration.options[:replicator]].new(helper)
end

#runObject

Executes the replication run.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rubyrep/replication_run.rb', line 43

def run
  return unless [:left, :right].any? do |database|
    changes_pending = false
    t = Thread.new do
      changes_pending = session.send(database).select_one(
        "select id from #{session.configuration.options[:rep_prefix]}_pending_changes limit 1"
      ) != nil
    end
    t.join session.configuration.options[:database_connection_timeout]
    changes_pending
  end

  # Apparently sometimes above check for changes takes already so long, that
  # the replication run times out.
  # Check for this and if timed out, return (silently).
  return if sweeper.terminated?

  loaders = LoggedChangeLoaders.new(session)

  success = false
  begin
    replicator # ensure that replicator is created and has chance to validate settings

    loop do
      begin
        loaders.update # ensure the cache of change log records is up-to-date
        diff = ReplicationDifference.new loaders
        diff.load
        break unless diff.loaded?
        break if sweeper.terminated?
        if diff.type != :no_diff and not event_filtered?(diff)
          replicator.replicate_difference diff
        end
      rescue Exception => e
        begin
          helper.log_replication_outcome diff, e.message,
            e.class.to_s + "\n" + e.backtrace.join("\n")
        rescue Exception => _
          # if logging to database itself fails, re-raise the original exception
          raise e
        end
      end
    end
    success = true
  ensure
    if sweeper.terminated?
      helper.finalize false
      session.disconnect_databases
    else
      helper.finalize success
    end
  end
end