Class: RR::LoggedChangeLoader

Inherits:
Object
  • Object
show all
Defined in:
lib/rubyrep/logged_change_loader.rb

Overview

Caches the entries in the change log table

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, database) ⇒ LoggedChangeLoader

Create a new change log record cache.

  • session: The current Session

  • database: Either :left or :right



83
84
85
86
87
88
89
90
91
# File 'lib/rubyrep/logged_change_loader.rb', line 83

def initialize(session, database)
  self.session = session
  self.database = database
  self.connection = session.send(database)

  init_cache
  self.current_id = -1
  self.last_updated = 1.year.ago
end

Instance Attribute Details

#change_arrayObject

Array with all cached changes. Processed change log records are replaced with nil.



54
55
56
# File 'lib/rubyrep/logged_change_loader.rb', line 54

def change_array
  @change_array
end

#change_treeObject

Tree (hash) structure for fast access to all cached changes. First level of tree:

  • key: table name

  • value: 2nd level tree

2nd level tree:

  • key: the change_key value of the according change log records.

  • value: An array of according change log records (column_name => value hash). Additional entry of each change log hash:

    • key: ‘array_index’

    • value: index to the change log record in change_array



67
68
69
# File 'lib/rubyrep/logged_change_loader.rb', line 67

def change_tree
  @change_tree
end

#connectionObject

The current ProxyConnection.



44
45
46
# File 'lib/rubyrep/logged_change_loader.rb', line 44

def connection
  @connection
end

#current_idObject

ID of the last cached change log record.



50
51
52
# File 'lib/rubyrep/logged_change_loader.rb', line 50

def current_id
  @current_id
end

#current_indexObject

Index to the next unprocessed change in the change_array.



47
48
49
# File 'lib/rubyrep/logged_change_loader.rb', line 47

def current_index
  @current_index
end

#databaseObject

Current database (either :left or :right)



41
42
43
# File 'lib/rubyrep/logged_change_loader.rb', line 41

def database
  @database
end

#last_updatedObject

Date of last update of the cache



70
71
72
# File 'lib/rubyrep/logged_change_loader.rb', line 70

def last_updated
  @last_updated
end

#sessionObject

The current Session.



38
39
40
# File 'lib/rubyrep/logged_change_loader.rb', line 38

def session
  @session
end

Instance Method Details

#load(change_table, change_key) ⇒ Object

Returns the specified change log record (column_name => value hash).

  • change_table: the name of the table that was changed

  • change_key: the change key of the modified record



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/rubyrep/logged_change_loader.rb', line 156

def load(change_table, change_key)
  update
  change = nil
  table_change_tree = change_tree[change_table]
  if table_change_tree
    key_changes = table_change_tree[change_key]
    if key_changes
      # get change object and delete from key_changes
      change = key_changes.shift

      # delete change from change_array
      change_array[change['array_index']] = nil

      # delete change from database
      connection.execute "delete from #{change_log_table} where id = #{change['id']}"

      # delete key_changes if empty
      if key_changes.empty?
        table_change_tree.delete change_key
      end

      # delete table_change_tree if empty
      if table_change_tree.empty?
        change_tree.delete change_table
      end

      # reset everything if no more changes remain
      if change_tree.empty?
        init_cache
      end
    end
  end
  change
end

#oldest_changeObject

Returns the oldest unprocessed change log record (column_name => value hash).



142
143
144
145
146
147
148
149
150
151
# File 'lib/rubyrep/logged_change_loader.rb', line 142

def oldest_change
  update
  oldest_change = nil
  unless change_array.empty?
    while (oldest_change = change_array[self.current_index]) == nil
      self.current_index += 1
    end
  end
  oldest_change
end

#oldest_change_timeObject

Returns the creation time of the oldest unprocessed change log record.



136
137
138
139
# File 'lib/rubyrep/logged_change_loader.rb', line 136

def oldest_change_time
  change = oldest_change
  change['change_time'] if change
end

#update(options = {:forced => false, :expire_time => 1}) ⇒ Object

Updates the cache. Options is a hash determining when the update is actually executed:

  • :expire_time: cache is older than the given number of seconds

  • :forced: if true update the cache even if not yet expired



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/rubyrep/logged_change_loader.rb', line 97

def update(options = {:forced => false, :expire_time => 1})
  return unless options[:forced] or Time.now - self.last_updated >= options[:expire_time]
  
  self.last_updated = Time.now

  # First, let's use a LIMIT clause (via :row_buffer_size option) to verify
  # if there are any pending changes.
  # (If there are many pending changes, this is (at least with PostgreSQL)
  # much faster.)
  cursor = connection.select_cursor(
    :table => change_log_table,
    :from => {'id' => current_id},
    :exclude_starting_row => true,
    :row_buffer_size => 1
  )
  return unless cursor.next?

  # Something is here. Let's actually load it.
  cursor = connection.select_cursor(
    :table => change_log_table,
    :from => {'id' => current_id},
    :exclude_starting_row => true,
    :type_cast => true,
    :row_buffer_size => session.configuration.options[:row_buffer_size]
  )
  while cursor.next?
    change = cursor.next_row
    self.current_id = change['id']
    self.change_array << change
    change['array_index'] = self.change_array.size - 1

    table_change_tree = change_tree[change['change_table']] ||= {}
    key_changes = table_change_tree[change['change_key']] ||= []
    key_changes << change
  end
  cursor.clear
end