Class: RR::ReplicationInitializer

Inherits:
Object
  • Object
show all
Defined in:
lib/rubyrep/replication_initializer.rb

Overview

Ensures all preconditions are met to start with replication

Constant Summary collapse

DIFF_DUMP_SIZE =

Size of the replication log column diff_dump

2000
DESCRIPTION_SIZE =

Size fo the event log column ‘description’

255
LONG_DESCRIPTION_SIZE =

Size of the event log column ‘long_description’

1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session) ⇒ ReplicationInitializer

Creates a new RepInititializer for the given Session



14
15
16
# File 'lib/rubyrep/replication_initializer.rb', line 14

def initialize(session)
  self.session = session
end

Instance Attribute Details

#sessionObject

The active Session



11
12
13
# File 'lib/rubyrep/replication_initializer.rb', line 11

def session
  @session
end

Instance Method Details

#change_log_exists?(database) ⇒ Boolean

Returns true if the change log exists in the specified database.

  • database: either :left or :right

Returns:

  • (Boolean)


104
105
106
# File 'lib/rubyrep/replication_initializer.rb', line 104

def change_log_exists?(database)
  session.send(database).tables.include? "#{options[:rep_prefix]}_pending_changes"
end

#clear_sequence_setup(database, table) ⇒ Object

Restores the original sequence settings for the named table. (Actually it sets the sequence increment to 1. If before, it had a different value, then the restoration will not be correct.)

  • database: either :left or :right

  • table_name: name of the table



93
94
95
96
97
98
99
100
# File 'lib/rubyrep/replication_initializer.rb', line 93

def clear_sequence_setup(database, table)
  table_options = options(table)
  if table_options[:adjust_sequences]
    session.send(database).clear_sequence_setup(
      table_options[:rep_prefix], table
    )
  end
end

#create_change_log(database) ⇒ Object

Creates the change log table in the specified database

  • database: either :left or :right



170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/rubyrep/replication_initializer.rb', line 170

def create_change_log(database)
  silence_ddl_notices(database) do
    session.send(database).create_table "#{options[:rep_prefix]}_pending_changes", :id => false do |t|
      t.column :change_table, :string
      t.column :change_key, :string
      t.column :change_new_key, :string
      t.column :change_type, :string
      t.column :change_time, :timestamp
    end
    session.send(database).add_big_primary_key "#{options[:rep_prefix]}_pending_changes", 'id'
  end
end

#create_event_logObject

Creates the replication log table.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/rubyrep/replication_initializer.rb', line 150

def create_event_log
  silence_ddl_notices(:left) do
    session.left.create_table "#{options[:rep_prefix]}_logged_events", :id => false do |t|
      t.column :activity, :string
      t.column :change_table, :string
      t.column :diff_type, :string
      t.column :change_key, :string
      t.column :left_change_type, :string
      t.column :right_change_type, :string
      t.column :description, :string, :limit => DESCRIPTION_SIZE
      t.column :long_description, :string, :limit => LONG_DESCRIPTION_SIZE
      t.column :event_time, :timestamp
      t.column :diff_dump, :string, :limit => DIFF_DUMP_SIZE
    end
    session.left.add_big_primary_key "#{options[:rep_prefix]}_logged_events", 'id'
  end
end

#create_trigger(database, table) ⇒ Object

Creates a trigger logging all table changes

  • database: either :left or :right

  • table: name of the table



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/rubyrep/replication_initializer.rb', line 31

def create_trigger(database, table)
  options = self.options(table)

  params = {
    :trigger_name => "#{options[:rep_prefix]}_#{table}",
    :table => table,
    :keys => session.send(database).primary_key_names(table),
    :log_table => "#{options[:rep_prefix]}_pending_changes",
    :activity_table => "#{options[:rep_prefix]}_running_flags",
    :key_sep => options[:key_sep],
    :exclude_rr_activity => false,
  }

  session.send(database).create_replication_trigger params
end

#drop_activity_markersObject

Checks in both databases, if the activity_marker tables exist. If yes, drops them.



230
231
232
233
234
235
236
237
# File 'lib/rubyrep/replication_initializer.rb', line 230

def drop_activity_markers
  table_name = "#{options[:rep_prefix]}_running_flags"
  [:left, :right].each do |database|
    if session.send(database).tables.include? table_name
      session.send(database).drop_table table_name
    end
  end
end

#drop_change_log(database) ⇒ Object

Drops the change log table in the specified database

  • database: either :left or :right



115
116
117
# File 'lib/rubyrep/replication_initializer.rb', line 115

def drop_change_log(database)
  session.send(database).drop_table "#{options[:rep_prefix]}_pending_changes"
end

#drop_change_logsObject

Checks in both databases, if the change_log tables exist. If yes, drops them.



223
224
225
226
227
# File 'lib/rubyrep/replication_initializer.rb', line 223

def drop_change_logs
  [:left, :right].each do |database|
    drop_change_log(database) if change_log_exists?(database)
  end
end

#drop_event_logObject

Drops the replication log table.



120
121
122
# File 'lib/rubyrep/replication_initializer.rb', line 120

def drop_event_log
  session.left.drop_table "#{options[:rep_prefix]}_logged_events"
end

#drop_infrastructureObject

Removes all rubyrep infrastructure tables from both databases.



240
241
242
243
244
# File 'lib/rubyrep/replication_initializer.rb', line 240

def drop_infrastructure
  drop_event_log if event_log_exists?
  drop_change_logs
  drop_activity_markers
end

#drop_trigger(database, table) ⇒ Object

Drops the replication trigger of the named table.

  • database: either :left or :right

  • table: name of the table



58
59
60
61
# File 'lib/rubyrep/replication_initializer.rb', line 58

def drop_trigger(database, table)
  trigger_name = "#{options(table)[:rep_prefix]}_#{table}"
  session.send(database).drop_replication_trigger trigger_name, table
end

#ensure_activity_markersObject

Checks in both databases, if the activity marker tables exist and if not, creates them.



190
191
192
193
194
195
196
197
198
199
# File 'lib/rubyrep/replication_initializer.rb', line 190

def ensure_activity_markers
  table_name = "#{options[:rep_prefix]}_running_flags"
  [:left, :right].each do |database|
    unless session.send(database).tables.include? table_name
      session.send(database).create_table table_name, :id => false do |t|
        t.column :active, :integer
      end
    end
  end
end

#ensure_change_logsObject

Checks in both databases, if the change log tables exists and creates them if necessary



208
209
210
211
212
# File 'lib/rubyrep/replication_initializer.rb', line 208

def ensure_change_logs
  [:left, :right].each do |database|
    create_change_log(database) unless change_log_exists?(database)
  end
end

#ensure_event_logObject

Checks if the event log table already exists and creates it if necessary



202
203
204
# File 'lib/rubyrep/replication_initializer.rb', line 202

def ensure_event_log
  create_event_log unless event_log_exists?
end

#ensure_infrastructureObject

Checks in both databases, if the infrastructure tables (change log, event log) exist and creates them if necessary.



216
217
218
219
220
# File 'lib/rubyrep/replication_initializer.rb', line 216

def ensure_infrastructure
  ensure_activity_markers
  ensure_change_logs
  ensure_event_log
end

#ensure_sequence_setup(table_pair, increment, left_offset, right_offset) ⇒ Object

Ensures that the sequences of the named table (normally the primary key column) are generated with the correct increment and offset in both left and right database. The sequence is always updated in both databases.

  • table_pair: a hash of names of corresponding :left and :right tables

  • increment: increment of the sequence

  • left_offset: offset of table in left database

  • right_offset: offset of table in right database

    1. an increment of 2 and offset of 1 will lead to generation of odd

numbers.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/rubyrep/replication_initializer.rb', line 73

def ensure_sequence_setup(table_pair, increment, left_offset, right_offset)
  table_options = options(table_pair[:left])
  if table_options[:adjust_sequences]
    rep_prefix = table_options[:rep_prefix]
    left_sequence_values = session.left.sequence_values rep_prefix, table_pair[:left]
    right_sequence_values = session.right.sequence_values rep_prefix, table_pair[:right]
    [:left, :right].each do |database|
      offset = database == :left ? left_offset : right_offset
      session.send(database).update_sequences \
        rep_prefix, table_pair[database], increment, offset,
        left_sequence_values, right_sequence_values, table_options[:sequence_adjustment_buffer]
    end
  end
end

#event_log_exists?Boolean

Returns true if the replication log exists.

Returns:

  • (Boolean)


109
110
111
# File 'lib/rubyrep/replication_initializer.rb', line 109

def event_log_exists?
  session.left.tables.include? "#{options[:rep_prefix]}_logged_events"
end

#exclude_rubyrep_tablesObject

Adds to the current session’s configuration an exclusion of rubyrep tables.



184
185
186
# File 'lib/rubyrep/replication_initializer.rb', line 184

def exclude_rubyrep_tables
  session.configuration.exclude_rubyrep_tables
end

#options(table = nil) ⇒ Object

Returns the options for the given table. If table is nil, returns general options.



20
21
22
23
24
25
26
# File 'lib/rubyrep/replication_initializer.rb', line 20

def options(table = nil)
  if table
    session.configuration.options_for_table table
  else
    session.configuration.options
  end
end

#prepare_replicationObject

Prepares the database / tables for replication.



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/rubyrep/replication_initializer.rb', line 266

def prepare_replication
  exclude_rubyrep_tables

  puts "Verifying RubyRep tables"
  ensure_infrastructure

  puts "Checking for and removing rubyrep triggers from unconfigured tables"
  restore_unconfigured_tables

  puts "Verifying rubyrep triggers of configured tables"
  unsynced_table_pairs = []
  table_pairs = session.sort_table_pairs(session.configured_table_pairs)
  table_pairs.each do |table_pair|
    table_options = options(table_pair[:left])
    ensure_sequence_setup table_pair,
      table_options[:sequence_increment],
      table_options[:left_sequence_offset],
      table_options[:right_sequence_offset]

    unsynced = false
    [:left, :right].each do |database|
      unless trigger_exists? database, table_pair[database]
        create_trigger database, table_pair[database]
        unsynced = true
      end
    end
    unsynced_table_pairs << table_pair if unsynced
  end
  unsynced_table_specs = unsynced_table_pairs.map do |table_pair|
    "#{table_pair[:left]}, #{table_pair[:right]}"
  end

  unless unsynced_table_specs.empty?
    puts "Executing initial table syncs"
    runner = SyncRunner.new
    runner.session = session
    runner.options = {:table_specs => unsynced_table_specs}
    runner.execute
  end

  puts "Starting replication"
end

#restore_unconfigured_tables(configured_table_pairs = session.configured_table_pairs) ⇒ Object

Checks for tables that have triggers but are not in the list of configured tables. Removes triggers and restores sequences of those tables.

  • configured_table_pairs: An array of table pairs (e. g. [=> ‘xy’, :right => ‘xy2’]).



250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/rubyrep/replication_initializer.rb', line 250

def restore_unconfigured_tables(configured_table_pairs = session.configured_table_pairs)
  [:left, :right].each do |database|
    configured_tables = configured_table_pairs.map {|table_pair| table_pair[database]}
    unconfigured_tables = session.send(database).tables - configured_tables
    unconfigured_tables.each do |table|
      if trigger_exists?(database, table)
        drop_trigger(database, table)
        session.send(database).execute(
          "delete from #{options[:rep_prefix]}_pending_changes where change_table = '#{table}'")
      end
      clear_sequence_setup(database, table)
    end
  end
end

#silence_ddl_notices(database) ⇒ Object

Ensures that create_table and related statements don’t print notices to stdout. Then restored original message setting.

  • database: either :left or :right



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/rubyrep/replication_initializer.rb', line 136

def silence_ddl_notices(database)
  if session.configuration.send(database)[:adapter] =~ /postgres/
    old_message_level = session.send(database).
      select_one("show client_min_messages")['client_min_messages']
    session.send(database).execute "set client_min_messages = warning"
  end
  yield
ensure
  if session.configuration.send(database)[:adapter] =~ /postgres/
    session.send(database).execute "set client_min_messages = #{old_message_level}"
  end
end

#trigger_exists?(database, table) ⇒ Boolean

Returns true if the replication trigger for the given table exists.

  • database: either :left or :right

  • table: name of the table

Returns:

  • (Boolean)


50
51
52
53
# File 'lib/rubyrep/replication_initializer.rb', line 50

def trigger_exists?(database, table)
  trigger_name = "#{options(table)[:rep_prefix]}_#{table}"
  session.send(database).replication_trigger_exists? trigger_name, table
end