Module: RR::ReplicationExtenders::PostgreSQLReplication

Defined in:
lib/rubyrep/replication_extenders/postgresql_replication.rb

Overview

Provides PostgreSQL specific functionality for database replication

Instance Method Summary collapse

Instance Method Details

#add_big_primary_key(table_name, key_name) ⇒ Object

Adds a big (8 byte value), auto-incrementing primary key column to the specified table.

  • table_name: name of the target table

  • key_name: name of the primary key column



187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/rubyrep/replication_extenders/postgresql_replication.rb', line 187

def add_big_primary_key(table_name, key_name)
  old_message_level = select_one("show client_min_messages")['client_min_messages']
  execute "set client_min_messages = warning"
  execute(<<-end_sql)
    alter table "#{table_name}" add column #{key_name} bigserial
  end_sql

  execute(<<-end_sql)
    alter table "#{table_name}" add constraint #{table_name}_#{key_name}_pkey primary key (#{key_name})
  end_sql
  
ensure
  execute "set client_min_messages = #{old_message_level}"
end

#clear_sequence_setup(rep_prefix, table_name) ⇒ Object

Restores the original sequence settings. (Actually it sets the sequence increment to 1. If before, it had a different value, then the restoration will not be correct.)

  • rep_prefix: not used (necessary) for the Postgres

  • table_name: name of the table



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/rubyrep/replication_extenders/postgresql_replication.rb', line 167

def clear_sequence_setup(rep_prefix, table_name)
  sequence_names = select_all(<<-end_sql).map { |row| row['relname'] }
    select s.relname
    from pg_class as t
    join pg_depend as r on t.oid = r.refobjid
    join pg_class as s on r.objid = s.oid
    and s.relkind = 'S'
    and t.relname = '#{table_name}'
  end_sql
  sequence_names.each do |sequence_name|
    execute(<<-end_sql)
        alter sequence "#{sequence_name}" increment by 1
    end_sql
  end
end

#create_or_replace_replication_trigger_function(params) ⇒ Object

Creates or replaces the replication trigger function. See #create_replication_trigger for a descriptions of the params hash.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rubyrep/replication_extenders/postgresql_replication.rb', line 20

def create_or_replace_replication_trigger_function(params)
  # first check, if PL/SQL is already activated and if not, do so.
  if select_all("select lanname from pg_language where lanname = 'plpgsql'").empty?
    execute "CREATE LANGUAGE plpgsql"
  end

  activity_check = ""
  if params[:exclude_rr_activity] then
    activity_check = <<-end_sql
      PERFORM ACTIVE FROM #{params[:activity_table]};
      IF FOUND THEN
        RETURN NULL;
      END IF;
    end_sql
  end

  # now create the trigger
  execute(<<-end_sql)
    CREATE OR REPLACE FUNCTION "#{params[:trigger_name]}"() RETURNS TRIGGER AS $change_trigger$
      BEGIN
        #{activity_check}
        IF (TG_OP = 'DELETE') THEN
          INSERT INTO #{params[:log_table]}(change_table, change_key, change_type, change_time) 
            SELECT '#{params[:table]}', #{key_clause('OLD', params)}, 'D', now();
        ELSIF (TG_OP = 'UPDATE') THEN
          INSERT INTO #{params[:log_table]}(change_table, change_key, change_new_key, change_type, change_time)
            SELECT '#{params[:table]}', #{key_clause('OLD', params)}, #{key_clause('NEW', params)}, 'U', now();
        ELSIF (TG_OP = 'INSERT') THEN
          INSERT INTO #{params[:log_table]}(change_table, change_key, change_type, change_time)
            SELECT '#{params[:table]}', #{key_clause('NEW', params)}, 'I', now();
        END IF;
        RETURN NULL; -- result is ignored since this is an AFTER trigger
      END;
    $change_trigger$ LANGUAGE plpgsql;
  end_sql

end

#create_replication_trigger(params) ⇒ Object

Creates a trigger to log all changes for the given table. params is a hash with all necessary information:

  • :trigger_name: name of the trigger

  • :table: name of the table that should be monitored

  • :keys: array of names of the key columns of the monitored table

  • :log_table: name of the table receiving all change notifications

  • :activity_table: name of the table receiving the rubyrep activity information

  • :key_sep: column seperator to be used in the key column of the log table

  • :exclude_rr_activity: if true, the trigger will check and filter out changes initiated by RubyRep



68
69
70
71
72
73
74
75
76
# File 'lib/rubyrep/replication_extenders/postgresql_replication.rb', line 68

def create_replication_trigger(params)
  create_or_replace_replication_trigger_function params

  execute(<<-end_sql)
    CREATE TRIGGER "#{params[:trigger_name]}"
    AFTER INSERT OR UPDATE OR DELETE ON "#{params[:table]}"
        FOR EACH ROW EXECUTE PROCEDURE "#{params[:trigger_name]}"();
  end_sql
end

#drop_replication_trigger(trigger_name, table_name) ⇒ Object

Removes a trigger and related trigger procedure.

  • trigger_name: name of the trigger

  • table_name: name of the table for which the trigger exists



81
82
83
84
# File 'lib/rubyrep/replication_extenders/postgresql_replication.rb', line 81

def drop_replication_trigger(trigger_name, table_name)
  execute "DROP TRIGGER \"#{trigger_name}\" ON \"#{table_name}\";"
  execute "DROP FUNCTION \"#{trigger_name}\"();"
end

#replication_trigger_exists?(trigger_name, table_name) ⇒ Boolean

Returns true if the named trigger exists for the named table.

  • trigger_name: name of the trigger

  • table_name: name of the table

Returns:

  • (Boolean)


89
90
91
92
93
94
95
96
97
98
# File 'lib/rubyrep/replication_extenders/postgresql_replication.rb', line 89

def replication_trigger_exists?(trigger_name, table_name)
  search_path = select_one("show search_path")['search_path']
  schemas = search_path.split(/,/).map { |p| quote(p) }.join(',')
  !select_all(<<-end_sql).empty?
    select 1 from information_schema.triggers
    where event_object_schema in (#{schemas})
    and trigger_name = '#{trigger_name}'
    and event_object_table = '#{table_name}'
  end_sql
end

#sequence_values(rep_prefix, table_name) ⇒ Object

Returns all unadjusted sequences of the given table. Parameters:

  • rep_prefix: not used (necessary) for the Postgres

  • table_name: name of the table

Return value: a hash with

  • key: sequence name

  • value: a hash with

    • :increment: current sequence increment

    • :value: current value



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/rubyrep/replication_extenders/postgresql_replication.rb', line 109

def sequence_values(rep_prefix, table_name)
  result = {}
  sequence_names = select_all(<<-end_sql).map { |row| row['relname'] }
    select s.relname
    from pg_class as t
    join pg_depend as r on t.oid = r.refobjid
    join pg_class as s on r.objid = s.oid
    and s.relkind = 'S'
    and t.relname = '#{table_name}'
  end_sql
  sequence_names.each do |sequence_name|
    row = select_one("select last_value, increment_by from \"#{sequence_name}\"")
    result[sequence_name] = {
      :increment => row['increment_by'].to_i,
      :value => row['last_value'].to_i
    }
  end
  result
end

#update_sequences(rep_prefix, table_name, increment, offset, left_sequence_values, right_sequence_values, adjustment_buffer) ⇒ Object

Ensures that the sequences of the named table (normally the primary key column) are generated with the correct increment and offset.

  • rep_prefix: not used (necessary) for the Postgres

  • table_name: name of the table (not used for Postgres)

  • increment: increment of the sequence

  • offset: offset

  • left_sequence_values:

    hash as returned by #sequence_values for the left database
    
  • right_sequence_values:

    hash as returned by #sequence_values for the right database
    
  • adjustment_buffer:

    the "gap" that is created during sequence update to avoid concurrency problems
    
    1. an increment of 2 and offset of 1 will lead to generation of odd

numbers.



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/rubyrep/replication_extenders/postgresql_replication.rb', line 143

def update_sequences(
    rep_prefix, table_name, increment, offset,
    left_sequence_values, right_sequence_values, adjustment_buffer)
  left_sequence_values.each do |sequence_name, left_current_value|
    row = select_one("select last_value, increment_by from \"#{sequence_name}\"")
    current_increment = row['increment_by'].to_i
    current_value = row['last_value'].to_i
    unless current_increment == increment and current_value % increment == offset
      max_current_value =
        [left_current_value[:value], right_sequence_values[sequence_name][:value]].max +
        adjustment_buffer
      new_start = max_current_value - (max_current_value % increment) + increment + offset
      execute(<<-end_sql)
      alter sequence "#{sequence_name}" increment by #{increment} restart with #{new_start}
      end_sql
    end
  end
end