Class: ActiveRecord::ConnectionAdapters::SeamlessDatabasePoolAdapter

Inherits:
AbstractAdapter
  • Object
show all
Defined in:
lib/active_record/connection_adapters/seamless_database_pool_adapter.rb

Defined Under Namespace

Classes: AvailableConnections, DatabaseConnectionError

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, logger, master_connection, read_connections, pool_weights, config) ⇒ SeamlessDatabasePoolAdapter

Returns a new instance of SeamlessDatabasePoolAdapter.



156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 156

def initialize(connection, logger, master_connection, read_connections, pool_weights, config)
  @master_connection = master_connection
  @read_connections = read_connections.dup.freeze
  
  super(connection, logger, config)

  @weighted_read_connections = []
  pool_weights.each_pair do |conn, weight|
    weight.times{@weighted_read_connections << conn}
  end
  @available_read_connections = [AvailableConnections.new(@weighted_read_connections)]
end

Instance Attribute Details

#master_connectionObject (readonly)

Returns the value of attribute master_connection.



83
84
85
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 83

def master_connection
  @master_connection
end

#read_connectionsObject (readonly)

Returns the value of attribute read_connections.



83
84
85
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 83

def read_connections
  @read_connections
end

Class Method Details

.adapter_class(master_connection) ⇒ Object

Create an anonymous class that extends this one and proxies methods to the pool connections.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 87

def adapter_class(master_connection)
  adapter_class_name = master_connection.adapter_name.classify
  return const_get(adapter_class_name) if const_defined?(adapter_class_name, false)
  
  # Define methods to proxy to the appropriate pool
  read_only_methods = [:select, :select_rows, :execute, :tables, :columns]
  clear_cache_methods = [:insert, :update, :delete]
  
  # Get a list of all methods redefined by the underlying adapter. These will be
  # proxied to the master connection.
  master_methods = []
  override_classes = (master_connection.class.ancestors - AbstractAdapter.ancestors)
  override_classes.each do |connection_class|
    master_methods.concat(connection_class.public_instance_methods(false))
    master_methods.concat(connection_class.protected_instance_methods(false))
  end
  master_methods = master_methods.collect{|m| m.to_sym}.uniq
  master_methods -= public_instance_methods(false) + protected_instance_methods(false) + private_instance_methods(false)
  master_methods -= read_only_methods
  master_methods -= [:select_all, :select_one, :select_value, :select_values]
  master_methods -= clear_cache_methods

  klass = Class.new(self)
  master_methods.each do |method_name|
    klass.class_eval <<-EOS, __FILE__, __LINE__ + 1
      def #{method_name}(*args, &block)
        use_master_connection do
          return proxy_connection_method(master_connection, :#{method_name}, :master, *args, &block)
        end
      end
    EOS
  end
  
  clear_cache_methods.each do |method_name|
    klass.class_eval <<-EOS, __FILE__, __LINE__ + 1
      def #{method_name}(*args, &block)
        clear_query_cache if query_cache_enabled
        use_master_connection do
          return proxy_connection_method(master_connection, :#{method_name}, :master, *args, &block)
        end
      end
    EOS
  end
  
  read_only_methods.each do |method_name|
    klass.class_eval <<-EOS, __FILE__, __LINE__ + 1
      def #{method_name}(*args, &block)
        connection = @use_master ? master_connection : current_read_connection
        proxy_connection_method(connection, :#{method_name}, :read, *args, &block)
      end
    EOS
  end
  klass.send :protected, :select

  const_set(adapter_class_name, klass)
  
  return klass
end

.visitor_for(pool) ⇒ Object

Set the arel visitor on the connections.



147
148
149
150
151
152
153
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 147

def visitor_for(pool)
  # This is ugly, but then again, so is the code in ActiveRecord for setting the arel
  # visitor. There is a note in the code indicating the method signatures should be updated.
  config = pool.spec.config.with_indifferent_access
  adapter = config[:master][:adapter] || config[:pool_adapter]
  SeamlessDatabasePool.adapter_class_for(adapter).visitor_for(pool)
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


201
202
203
204
205
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 201

def active?
  active = true
  do_to_connections {|conn| active &= conn.active?}
  return active
end

#adapter_nameObject

:nodoc:



169
170
171
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 169

def adapter_name #:nodoc:
  'Seamless_Database_Pool'
end

#all_connectionsObject

Returns an array of the master connection and the read pool connections



174
175
176
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 174

def all_connections
  [@master_connection] + @read_connections
end

#available_read_connectionsObject

Get the available weighted connections. When a connection is dead and cannot be reconnected, it will be temporarily removed from the read pool so we don’t keep trying to reconnect to a database that isn’t listening.



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 296

def available_read_connections
  available = @available_read_connections.last
  if available.expired?
    begin
      @logger.info("Adding dead database connection back to the pool") if @logger
      available.reconnect!
    rescue => e
      # Couldn't reconnect so try again in a little bit
      if @logger
        @logger.warn("Failed to reconnect to database when adding connection back to the pool")
        @logger.warn(e)
      end
      available.expires = 30.seconds.from_now
      return available.connections
    end
    
    # If reconnect is successful, the connection will have been re-added to @available_read_connections list,
    # so let's pop this old version of the connection
    @available_read_connections.pop
    
    # Now we'll try again after either expiring our bad connection or re-adding our good one
    return available_read_connections
  else
    return available.connections
  end
end

#current_read_connectionObject

Get the current read connection



241
242
243
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 241

def current_read_connection
  return SeamlessDatabasePool.read_only_connection(self)
end

#disconnect!Object



211
212
213
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 211

def disconnect!
  do_to_connections {|conn| conn.disconnect!}
end

#inspectObject



264
265
266
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 264

def inspect
  to_s
end

#pool_weight(connection) ⇒ Object

Get the pool weight of a connection



179
180
181
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 179

def pool_weight(connection)
  return @weighted_read_connections.select{|conn| conn == connection}.size
end

#random_read_connectionObject

Get a random read connection from the pool. If the connection is not active, it will attempt to reconnect to the database. If that fails, it will be removed from the pool for one minute.



231
232
233
234
235
236
237
238
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 231

def random_read_connection
  weighted_read_connections = available_read_connections
  if @use_master || weighted_read_connections.empty?
    return master_connection
  else
    weighted_read_connections[rand(weighted_read_connections.length)]
  end
end

#reconnect!Object



207
208
209
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 207

def reconnect!
  do_to_connections {|conn| conn.reconnect!}
end

#requires_reloading?Boolean

Returns:

  • (Boolean)


183
184
185
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 183

def requires_reloading?
  false
end

#reset!Object



215
216
217
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 215

def reset!
  do_to_connections {|conn| conn.reset!}
end

#reset_available_read_connectionsObject



323
324
325
326
327
328
329
330
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 323

def reset_available_read_connections
  @available_read_connections.slice!(1, @available_read_connections.length)
  @available_read_connections.first.connections.each do |connection|
    unless connection.active?
      connection.reconnect! rescue nil
    end
  end
end

#reset_runtimeObject



223
224
225
226
227
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 223

def reset_runtime
  total = 0.0
  do_to_connections {|conn| total += conn.reset_runtime}
  total
end

#suppress_read_connection(conn, expire) ⇒ Object

Temporarily remove a connection from the read pool.



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 333

def suppress_read_connection(conn, expire)
  available = available_read_connections
  connections = available.reject{|c| c == conn}

  # This wasn't a read connection so don't suppress it
  return if connections.length == available.length

  if connections.empty?
    @logger.warn("All read connections are marked dead; trying them all again.") if @logger
    # No connections available so we might as well try them all again
    reset_available_read_connections
  else
    @logger.warn("Removing #{conn.inspect} from the connection pool for #{expire} seconds") if @logger
    # Available connections will now not include the suppressed connection for a while
    @available_read_connections.push(AvailableConnections.new(connections, conn, expire.seconds.from_now))
  end
end

#to_sObject



260
261
262
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 260

def to_s
  "#<#{self.class.name}:0x#{object_id.to_s(16)} #{all_connections.size} connections>"
end

#transaction(options = {}) ⇒ Object



187
188
189
190
191
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 187

def transaction(options = {})
  use_master_connection do
    super
  end
end

#use_master_connectionObject

Force using the master connection in a block.



250
251
252
253
254
255
256
257
258
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 250

def use_master_connection
  save_val = @use_master
  begin
    @use_master = true
    yield if block_given?
  ensure
    @use_master = save_val
  end
end

#using_master_connection?Boolean

Returns:

  • (Boolean)


245
246
247
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 245

def using_master_connection?
  !!@use_master
end

#verify!(*ignored) ⇒ Object



219
220
221
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 219

def verify!(*ignored)
  do_to_connections {|conn| conn.verify!(*ignored)}
end

#visitorObject



197
198
199
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 197

def visitor
  master_connection.visitor
end

#visitor=(visitor) ⇒ Object



193
194
195
# File 'lib/active_record/connection_adapters/seamless_database_pool_adapter.rb', line 193

def visitor=(visitor)
  all_connections.each{|conn| conn.visitor = visitor}
end