Class: RR::ProxiedTableScan

Inherits:
TableScan show all
Defined in:
lib/rubyrep/proxied_table_scan.rb

Overview

Scans two tables for differences. Goes through a RubyRep Proxy to minimize network load. Doesn’t have any reporting functionality by itself. Instead ProxiedTableScan#run yields all the differences for the caller to do with as it pleases. Usage:

1. Create a new ProxiedTableScan object and hand it all necessary information
2. Call ProxiedTableScan#run to do the actual comparison
3. The block handed to ProxiedTableScan#run receives all differences

Instance Attribute Summary

Attributes inherited from TableScan

#left_table, #primary_key_names, #progress_printer, #right_table, #session

Instance Method Summary collapse

Methods inherited from TableScan

#scan_options, #update_progress

Methods included from TableScanHelper

#rank_rows, scan_class

Constructor Details

#initialize(session, left_table, right_table = nil) ⇒ ProxiedTableScan

Creates a new ProxiedTableScan instance

* session: a Session object representing the current database session
* left_table: name of the table in the left database
* right_table: name of the table in the right database. If not given, same like left_table


21
22
23
24
25
# File 'lib/rubyrep/proxied_table_scan.rb', line 21

def initialize(session, left_table, right_table = nil)
  raise "#{self.class.name} only works with proxied sessions" unless session.proxied?

  super
end

Instance Method Details

#block_sizeObject

returns block size to use for table scanning



13
14
15
# File 'lib/rubyrep/proxied_table_scan.rb', line 13

def block_size
  @block_size ||= session.configuration.options_for_table(left_table)[:proxy_block_size]
end

#compare_blocks(left_block_cursor, right_block_cursor) ⇒ Object

Compares the specified left and right rows. left_cursor and right_cursor represent the according ProxyBlockCursor objects. Yields all identified differences with

  • diff_type

  • row

#run described the yield parameters in detail.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/rubyrep/proxied_table_scan.rb', line 33

def compare_blocks(left_block_cursor, right_block_cursor)
  left_cursor = right_cursor = nil
  
  left_row_checksums = left_block_cursor.row_checksums
  right_row_checksums = right_block_cursor.row_checksums

  # phase 1: identify the different rows and store their primary keys
  left_diff_rows = []
  left_diff_checksums = []
  right_diff_rows = []
  right_diff_checksums = []
  i = k = 0
  while i < left_row_checksums.size or k < right_row_checksums.size
    left_keys = i < left_row_checksums.size ? left_row_checksums[i][:row_keys] : nil
    right_keys = k < right_row_checksums.size ? right_row_checksums[k][:row_keys] : nil
    rank = rank_rows left_keys, right_keys
    case rank
    when -1
      left_diff_rows << left_keys
      left_diff_checksums << left_row_checksums[i][:checksum]
      i += 1
    when 1
      right_diff_rows << right_keys
      right_diff_checksums << right_row_checksums[k][:checksum]
      k += 1
    when 0
      if left_row_checksums[i][:checksum] != right_row_checksums[k][:checksum]
        left_diff_rows << left_keys
        left_diff_checksums << left_row_checksums[i][:checksum]
        right_diff_rows << right_keys
        right_diff_checksums << right_row_checksums[k][:checksum]
      end
      i += 1
      k += 1
    end
  end
  
  # retrieve possibly existing cached rows from the block cursors
  left_row_cache = left_block_cursor.retrieve_row_cache left_diff_checksums
  right_row_cache = right_block_cursor.retrieve_row_cache right_diff_checksums
  
  # builds arrays of row keys for rows that were not included in the hash
  left_uncached_rows = []
  left_diff_rows.each_with_index do |row, i|
    left_uncached_rows << row unless left_row_cache[left_diff_checksums[i]]
  end
  right_uncached_rows = []
  right_diff_rows.each_with_index do |row, i|
    right_uncached_rows << row unless right_row_cache[right_diff_checksums[i]]
  end

  # phase 2: read all different rows and yield them
  unless left_uncached_rows.empty?
    left_cursor = session.left.create_cursor \
      ProxyRowCursor, left_table, :row_keys => left_uncached_rows
  end
  unless right_uncached_rows.empty?
    right_cursor = session.right.create_cursor \
      ProxyRowCursor, right_table, :row_keys => right_uncached_rows  
  end
  i = k = 0
  while i < left_diff_rows.size or k < right_diff_rows.size
    rank = rank_rows left_diff_rows[i], right_diff_rows[k]
    case rank
    when -1
      if left_row_cache.include? left_diff_checksums[i]
        row = Marshal.load(left_row_cache[left_diff_checksums[i]])
      else
        row = left_cursor.next_row
      end
      yield :left, row
      i += 1
    when 1
      if right_row_cache.include? right_diff_checksums[k]
        row = Marshal.load(right_row_cache[right_diff_checksums[k]])
      else
        row = right_cursor.next_row
      end
      yield :right, row
      k += 1
    when 0
      if left_row_cache.include? left_diff_checksums[i]
        left_row = Marshal.load(left_row_cache[left_diff_checksums[i]])
      else
        left_row = left_cursor.next_row
      end
      if right_row_cache.include? right_diff_checksums[k]
        right_row = Marshal.load(right_row_cache[right_diff_checksums[k]])
      else
        row = right_cursor.next_row
      end
      yield :conflict, [left_row, right_row]
      i += 1
      k += 1
    end
  end
ensure
  session.left.destroy_cursor left_cursor if left_cursor
  session.right.destroy_cursor right_cursor if right_cursor
end

#run(&blck) ⇒ Object

Runs the table scan. Calls the block for every found difference. Differences are yielded with 2 parameters

* type: describes the difference, either :left (row only in left table), :right (row only in right table) or :conflict
* row: for :left or :right cases a hash describing the row; for :conflict an array of left and right row


139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/rubyrep/proxied_table_scan.rb', line 139

def run(&blck)
  left_cursor = right_cursor = nil
  left_cursor = session.left.create_cursor ProxyBlockCursor, left_table, 
    :row_buffer_size => scan_options[:row_buffer_size]
  right_cursor = session.right.create_cursor ProxyBlockCursor, right_table, 
    :row_buffer_size => scan_options[:row_buffer_size]
  update_progress 0 # ensures progress bar is printed even if there are no records
  while left_cursor.next?
    left_to, left_checksum, left_progress =
      left_cursor.checksum :proxy_block_size => block_size
    _ , right_checksum, right_progress =
      right_cursor.checksum :max_row => left_to
    combined_progress = left_progress + right_progress
    if left_checksum != right_checksum
      compare_blocks left_cursor, right_cursor do |type, row|
        steps = type == :conflict ? 2 : 1
        update_progress steps
        combined_progress -= steps
        yield type, row
      end
    end
    update_progress combined_progress
  end
  while right_cursor.next?
    update_progress 1
    yield :right, right_cursor.next_row
  end
ensure
  session.left.destroy_cursor left_cursor if left_cursor
  session.right.destroy_cursor right_cursor if right_cursor      
end