Class: RR::ReplicationRun

Inherits:
Object
  • Object
show all
Defined in:
lib/rubyrep/replication_run.rb

Overview

Executes a single replication run

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, sweeper) ⇒ ReplicationRun

Creates a new ReplicationRun instance.

  • session: the current Session

  • sweeper: the current TaskSweeper



136
137
138
139
140
# File 'lib/rubyrep/replication_run.rb', line 136

def initialize(session, sweeper)
  self.session = session
  self.sweeper = sweeper
  install_sweeper
end

Instance Attribute Details

#sessionObject

The current Session object



9
10
11
# File 'lib/rubyrep/replication_run.rb', line 9

def session
  @session
end

#sweeperObject

The current TaskSweeper



12
13
14
# File 'lib/rubyrep/replication_run.rb', line 12

def sweeper
  @sweeper
end

Instance Method Details

#event_filtered?(diff) ⇒ Boolean

Calls the event filter for the give difference.

  • diff: instance of ReplicationDifference

Returns true if replication of the difference should not proceed.

Returns:

  • (Boolean)


33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/rubyrep/replication_run.rb', line 33

def event_filtered?(diff)
  event_filter = helper.options_for_table(diff.changes[:left].table)[:event_filter]
  if event_filter && event_filter.respond_to?(:before_replicate)
    not event_filter.before_replicate(
      diff.changes[:left].table,
      helper.type_cast(diff.changes[:left].table, diff.changes[:left].key),
      helper, 
      diff
    )
  else
    false
  end
end

#helperObject

Returns the current ReplicationHelper; creates it if necessary



20
21
22
# File 'lib/rubyrep/replication_run.rb', line 20

def helper
  @helper ||= ReplicationHelper.new(self)
end

#install_sweeperObject

Installs the current sweeper into the database connections



124
125
126
127
128
129
130
131
# File 'lib/rubyrep/replication_run.rb', line 124

def install_sweeper
  [:left, :right].each do |database|
    unless session.send(database).respond_to?(:sweeper)
      session.send(database).send(:extend, NoisyConnection)
    end
    session.send(database).sweeper = sweeper
  end
end

#load_differenceObject

Returns the next available ReplicationDifference. (Either new unprocessed differences or if not available, the first available ‘second chancer’.)



50
51
52
53
54
55
56
57
58
59
# File 'lib/rubyrep/replication_run.rb', line 50

def load_difference
  @loaders ||= LoggedChangeLoaders.new(session)
  @loaders.update # ensure the cache of change log records is up-to-date
  diff = ReplicationDifference.new @loaders
  diff.load
  unless diff.loaded? or second_chancers.empty?
    diff = second_chancers.shift
  end
  diff
end

#replicatorObject

Returns the current replicator; creates it if necessary.



25
26
27
28
# File 'lib/rubyrep/replication_run.rb', line 25

def replicator
  @replicator ||=
    Replicators.replicators[session.configuration.options[:replicator]].new(helper)
end

#runObject

Executes the replication run.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/rubyrep/replication_run.rb', line 62

def run
  return unless [:left, :right].any? do |database|
    changes_pending = false
    t = Thread.new do
      changes_pending = session.send(database).select_one(
        "select id from #{session.configuration.options[:rep_prefix]}_pending_changes limit 1"
      ) != nil
    end
    t.join session.configuration.options[:database_connection_timeout]
    changes_pending
  end

  # Apparently sometimes above check for changes takes already so long, that
  # the replication run times out.
  # Check for this and if timed out, return (silently).
  return if sweeper.terminated?

  success = false
  begin
    replicator # ensure that replicator is created and has chance to validate settings

    loop do
      begin
        diff = load_difference
        break unless diff.loaded?
        break if sweeper.terminated?
        if diff.type != :no_diff and not event_filtered?(diff)
          replicator.replicate_difference diff
        end
      rescue Exception => e
        if e.message =~ /violates foreign key constraint|foreign key constraint fails/i and !diff.second_chance?
          # Note:
          # Identifying the foreign key constraint violation via regular expression is
          # database dependent and *dirty*.
          # It would be better to use the ActiveRecord #translate_exception mechanism.
          # However as per version 3.0.5 this doesn't work yet properly.

          diff.second_chance = true
          second_chancers << diff
        else
          begin
            helper.log_replication_outcome diff, e.message,
              e.class.to_s + "\n" + e.backtrace.join("\n")
          rescue Exception => _
            # if logging to database itself fails, re-raise the original exception
            raise e
          end
        end
      end
    end
    success = true
  ensure
    if sweeper.terminated?
      helper.finalize false
      session.disconnect_databases
    else
      helper.finalize success
    end
  end
end

#second_chancersObject

An array of ReplicationDifference which originally failed replication but should be tried one more time



15
16
17
# File 'lib/rubyrep/replication_run.rb', line 15

def second_chancers
  @second_chancers ||= []
end