Class: RR::ProxyBlockCursor

Inherits:
ProxyCursor show all
Includes:
TableScanHelper
Defined in:
lib/rubyrep/proxy_block_cursor.rb

Overview

This class is used to scan a table in blocks. Calculates the checksums of the scanned blocks.

Instance Attribute Summary collapse

Attributes inherited from ProxyCursor

#connection, #cursor, #primary_key_names, #table

Instance Method Summary collapse

Methods included from TableScanHelper

#rank_rows, scan_class

Methods inherited from ProxyCursor

#destroy, #prepare_fetch

Constructor Details

#initialize(session, table) ⇒ ProxyBlockCursor

Creates a new cursor

  • session: the current proxy session

  • table: table_name



40
41
42
43
# File 'lib/rubyrep/proxy_block_cursor.rb', line 40

def initialize(session, table)
  self.max_row_cache_size = 1000000 # this size should be sufficient as long as table doesn't contain blobs
  super
end

Instance Attribute Details

#current_row_cache_sizeObject

A byte counter of many bytes of row data have already been cached



32
33
34
# File 'lib/rubyrep/proxy_block_cursor.rb', line 32

def current_row_cache_size
  @current_row_cache_size
end

#digestObject

The current Digest



16
17
18
# File 'lib/rubyrep/proxy_block_cursor.rb', line 16

def digest
  @digest
end

#last_rowObject

nil if the last run of the checksum method left no unprocessed row. Otherwise the left over row of that checksum run



20
21
22
# File 'lib/rubyrep/proxy_block_cursor.rb', line 20

def last_row
  @last_row
end

#max_row_cache_sizeObject

The maximum total size (in bytes) up to which rows will be cached



29
30
31
# File 'lib/rubyrep/proxy_block_cursor.rb', line 29

def max_row_cache_size
  @max_row_cache_size
end

#row_cacheObject

A hash of cached rows consisting of row checksum => row dump pairs.



35
36
37
# File 'lib/rubyrep/proxy_block_cursor.rb', line 35

def row_cache
  @row_cache
end

#row_checksumsObject

Returns an array of checksums for each encounters row. Each array element is a Hash with the following elements:

  • :row_keys: A primary key => value hash identifying the row

  • :checksum: the checksum for this row



26
27
28
# File 'lib/rubyrep/proxy_block_cursor.rb', line 26

def row_checksums
  @row_checksums
end

Instance Method Details

#checksum(options = {}) ⇒ Object

Calculates the checksum from the current row up to the row specified by options. options is a hash including either

  • :proxy_block_size: The number of rows to scan.

  • :max_row: A row hash of primary key columns specifying the maximum record to scan.

Returns multiple parameters:

  • last row read

  • checksum

  • number of processed records



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/rubyrep/proxy_block_cursor.rb', line 110

def checksum(options = {})
  reset_checksum
  return_row = row = nil
  row_count = 0

  if options.include? :proxy_block_size
    block_size = options[:proxy_block_size]
    raise ":proxy_block_size must be greater than 0" unless block_size > 0
    while row_count < block_size and next?
      row = next_row
      update_checksum(row)
      row_count += 1
    end
    return_row = row
  elsif options.include? :max_row
    max_row = options[:max_row]
    while next?
      row = next_row
      rank = rank_rows row, max_row
      if rank > 0 
        # row > max_row ==> save the current row and break off
        self.last_row = row
        break
      end
      row_count += 1
      update_checksum(row)
      return_row, row = row, nil
    end  
  else
    raise "options must include either :proxy_block_size or :max_row"
  end
  return_keys = return_row.reject {|key, | not primary_key_names.include? key} if return_row
  return return_keys, current_checksum, row_count
end

#current_checksumObject

Returns the current checksum



98
99
100
# File 'lib/rubyrep/proxy_block_cursor.rb', line 98

def current_checksum
  self.digest.hexdigest
end

#next?Boolean

Returns true if the current cursor has unprocessed rows

Returns:

  • (Boolean)


46
47
48
# File 'lib/rubyrep/proxy_block_cursor.rb', line 46

def next?
  last_row != nil or cursor.next?
end

#next_rowObject

Returns the cursor’s next row



51
52
53
54
55
56
57
58
# File 'lib/rubyrep/proxy_block_cursor.rb', line 51

def next_row
  if self.last_row
    row, self.last_row = self.last_row, nil
  else
    row = cursor.next_row
  end
  row
end

#reset_checksumObject

Reinitializes the row checksum array and the total checksum



90
91
92
93
94
95
# File 'lib/rubyrep/proxy_block_cursor.rb', line 90

def reset_checksum
  self.row_checksums = []
  self.current_row_cache_size = 0
  self.row_cache = {}
  self.digest = Digest::SHA1.new
end

#retrieve_row_cache(checksums) ⇒ Object

Returns a hash of row checksum => row dump pairs for the checksums in the provided array



62
63
64
65
66
67
68
# File 'lib/rubyrep/proxy_block_cursor.rb', line 62

def retrieve_row_cache(checksums)
  row_dumps = {}
  checksums.each do |checksum|
    row_dumps[checksum] = row_cache[checksum] if row_cache.include? checksum
  end      
  row_dumps
end

#update_checksum(row) ⇒ Object

Updates block / row checksums and row cache with the given row.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/rubyrep/proxy_block_cursor.rb', line 71

def update_checksum(row)
  dump = Marshal.dump(row)
  
  # updates row checksum array
  row_keys = row.reject {|key, | not primary_key_names.include? key}
  checksum = Digest::SHA1.hexdigest(dump)
  self.row_checksums << {:row_keys => row_keys, :checksum => checksum}

  # update the row cache (unless maximum cache size limit has already been reached)
  if current_row_cache_size + dump.size < max_row_cache_size
    self.current_row_cache_size += dump.size
    row_cache[checksum] = dump
  end
  
  # update current total checksum
  self.digest << dump
end