Class: ActiveRecord::Bogacs::DefaultPool

Inherits:
Object
  • Object
show all
Includes:
PoolSupport, MonitorMixin
Defined in:
lib/active_record/bogacs/default_pool.rb

Overview

A “default” ‘ActiveRecord::ConnectionAdapters::ConnectionPool`-like pool implementation with compatibility across (older) Rails versions.

Currently, mostly, based on ActiveRecord 4.2.

api.rubyonrails.org/classes/ActiveRecord/ConnectionAdapters/ConnectionPool.html

Direct Known Subclasses

ShareablePool

Defined Under Namespace

Classes: Queue

Instance Attribute Summary collapse

Attributes included from PoolSupport

#schema_cache

Instance Method Summary collapse

Methods included from PoolSupport

#connection_cache_key, included, #lock_thread=, #new_connection

Constructor Details

#initialize(spec) ⇒ DefaultPool

Note:

The default ConnectionPool maximum size is 5.

Creates a new ‘ConnectionPool` object. spec is a ConnectionSpecification object which describes database connection information (e.g. adapter, host name, username, password, etc), as well as the maximum size for this ConnectionPool.

wait for a connection before giving up raising a timeout (default 5 seconds). when the pool is created (default 0). run a reaper, which attempts to find and close “dead” connections (can occur if a caller forgets to close a connection at the end of a thread or a thread dies unexpectedly) default is ‘nil` - don’t run the periodical Reaper (reaping will still happen occasionally). run a connection validation (in a separate thread), to avoid potentially stale sockets when connections stay open (pooled but unused) for longer periods.

Parameters:

  • spec (Hash)

    a ‘ConnectionSpecification`

  • spec.config (Hash)

    a customizable set of options



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/active_record/bogacs/default_pool.rb', line 185

def initialize(spec)
  super()

  @spec = spec

  @checkout_timeout = ( spec.config[:checkout_timeout] || 5 ).to_f
  if @idle_timeout = spec.config.fetch(:idle_timeout, 300)
    @idle_timeout = @idle_timeout.to_f
    @idle_timeout = nil if @idle_timeout <= 0
  end

  @reaper = Reaper.new self, spec.config[:reaping_frequency]
  @reaping = !! @reaper.run

  # default max pool size to 5
  if spec.config[:pool]
    @size = spec.config[:pool].to_i
  else
    if defined? Rails.env && ( (! Rails.env.development? && ! Rails.env.test?) rescue nil )
      logger && logger.debug("pool: option not set, using default size: 5")
    end
    @size = 5
  end

  # The cache of reserved connections mapped to threads
  @thread_cached_conns = ThreadSafe::Map.new(initial_capacity: @size)

  @connections = []
  @automatic_reconnect = true

  # Connection pool allows for concurrent (outside the main +synchronize+ section)
  # establishment of new connections. This variable tracks the number of threads
  # currently in the process of independently establishing connections to the DB.
  @now_connecting = 0

  @threads_blocking_new_connections = 0 # TODO: dummy for now

  @available = Queue.new self

  @lock_thread = false

  @connected = ::Concurrent::AtomicBoolean.new

  initial_size = spec.config[:pool_initial] || 0
  initial_size = @size if initial_size == true
  initial_size = (@size * initial_size).to_i if initial_size <= 1.0
  # NOTE: warn on onitial_size > size !
  prefill_initial_connections if ( @initial_size = initial_size.to_i ) > 0

  if frequency = spec.config[:validate_frequency]
    require 'active_record/bogacs/validator' unless self.class.const_defined?(:Validator)
    @validator = Validator.new self, frequency, spec.config[:validate_timeout]
    if @validator.run && @reaping
      logger && logger.warn(":validate_frequency configured alongside with :reaping_frequency")
    end
  end
end

Instance Attribute Details

#automatic_reconnectObject

Returns the value of attribute automatic_reconnect.



159
160
161
# File 'lib/active_record/bogacs/default_pool.rb', line 159

def automatic_reconnect
  @automatic_reconnect
end

#checkout_timeoutObject

Returns the value of attribute checkout_timeout.



159
160
161
# File 'lib/active_record/bogacs/default_pool.rb', line 159

def checkout_timeout
  @checkout_timeout
end

#initial_sizeObject (readonly)

Returns the value of attribute initial_size.



161
162
163
# File 'lib/active_record/bogacs/default_pool.rb', line 161

def initial_size
  @initial_size
end

#reaperObject (readonly)

Returns the value of attribute reaper.



160
161
162
# File 'lib/active_record/bogacs/default_pool.rb', line 160

def reaper
  @reaper
end

#sizeObject (readonly)

Returns the value of attribute size.



160
161
162
# File 'lib/active_record/bogacs/default_pool.rb', line 160

def size
  @size
end

#specObject (readonly)

Returns the value of attribute spec.



160
161
162
# File 'lib/active_record/bogacs/default_pool.rb', line 160

def spec
  @spec
end

#validatorObject (readonly)

Returns the value of attribute validator.



161
162
163
# File 'lib/active_record/bogacs/default_pool.rb', line 161

def validator
  @validator
end

Instance Method Details

#active_connection?true, false

Is there an open connection that is being used for the current thread?

Returns:

  • (true, false)


260
261
262
263
# File 'lib/active_record/bogacs/default_pool.rb', line 260

def active_connection?
  connection_id = connection_cache_key(current_thread)
  @thread_cached_conns.fetch(connection_id, nil)
end

#checkin(conn) ⇒ Object

Check-in a database connection back into the pool, indicating that you no longer need this connection.

conn: an AbstractAdapter object, which was obtained by earlier by calling #checkout on this pool.



415
416
417
418
419
420
421
422
423
424
425
# File 'lib/active_record/bogacs/default_pool.rb', line 415

def checkin(conn)
  #conn.lock.synchronize do
  synchronize do
    remove_connection_from_thread_cache conn

    _run_checkin_callbacks(conn)

    @available.add conn
  end
  #end
end

#checkout(checkout_timeout = @checkout_timeout) ⇒ ActiveRecord::ConnectionAdapters::AbstractAdapter

Check-out a database connection from the pool, callers are expected to call #checkin when the connection is no longer needed, so that others can use it.

This is done by either returning and leasing existing connection, or by creating a new connection and leasing it.

and the pool is at capacity (meaning the number of currently leased connections is greater than or equal to the size limit set)

Returns:

  • (ActiveRecord::ConnectionAdapters::AbstractAdapter)

Raises:

  • (ActiveRecord::ConnectionTimeoutError)

    if all connections are leased



406
407
408
# File 'lib/active_record/bogacs/default_pool.rb', line 406

def checkout(checkout_timeout = @checkout_timeout)
  checkout_and_verify(acquire_connection(checkout_timeout))
end

#clear_reloadable_connections!Object



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/active_record/bogacs/default_pool.rb', line 350

def clear_reloadable_connections!
  synchronize do
    @thread_cached_conns.clear
    @connections.each do |conn|
      if conn.in_use?
        conn.steal!
        checkin conn
      end
      conn.disconnect! if conn.requires_reloading?
    end
    @connections.delete_if(&:requires_reloading?)
    @available.clear
    @connections.each do |conn|
      @available.add conn
    end

    @connected.value = @connections.any?
  end
end

#clear_stale_cached_connections!Object

Return any checked-out connections back to the pool by threads that are no longer alive.



385
386
387
388
389
390
391
392
393
# File 'lib/active_record/bogacs/default_pool.rb', line 385

def clear_stale_cached_connections!
  keys = Thread.list.find_all { |t| t.alive? }.map(&:object_id)
  keys = @thread_cached_conns.keys - keys
  keys.each do |key|
    conn = @thread_cached_conns[key]
    checkin conn
    @thread_cached_conns.delete(key)
  end
end

#connected?true, false

Returns true if a connection has already been opened.

Returns:

  • (true, false)


292
293
294
# File 'lib/active_record/bogacs/default_pool.rb', line 292

def connected?
  @connected.true? # synchronize { @connections.any? }
end

#connectionActiveRecord::ConnectionAdapters::AbstractAdapter

Retrieve the connection associated with the current thread, or call #checkout to obtain one if necessary.

#connection can be called any number of times; the connection is held in a hash keyed by the thread id.

Returns:

  • (ActiveRecord::ConnectionAdapters::AbstractAdapter)


250
251
252
253
254
255
# File 'lib/active_record/bogacs/default_pool.rb', line 250

def connection
  connection_id = connection_cache_key(current_thread)
  conn = @thread_cached_conns.fetch(connection_id, nil)
  conn = ( @thread_cached_conns[connection_id] ||= checkout ) unless conn
  conn
end

#connectionsObject

Returns an array containing the connections currently in the pool. Access to the array does not require synchronization on the pool because the array is newly created and not retained by the pool.

However; this method bypasses the ConnectionPool’s thread-safe connection access pattern. A returned connection may be owned by another thread, unowned, or by happen-stance owned by the calling thread.

Calling methods on a connection without ownership is subject to the thread-safety guarantees of the underlying method. Many of the methods on connection adapter classes are inherently multi-thread unsafe.



307
308
309
# File 'lib/active_record/bogacs/default_pool.rb', line 307

def connections
  synchronize { @connections.dup }
end

#discard!Object

Discards all connections in the pool (even if they’re currently leased!), along with the pool itself. Any further interaction with the pool (except #spec and #schema_cache) is undefined.

See AbstractAdapter#discard!



334
335
336
337
338
339
340
341
342
343
344
# File 'lib/active_record/bogacs/default_pool.rb', line 334

def discard! # :nodoc:
  synchronize do
    return if discarded?
    @connected.make_false

    @connections.each do |conn|
      conn.discard!
    end
    @connections = @available = @thread_cached_conns = nil
  end
end

#discarded?Boolean

:nodoc:

Returns:

  • (Boolean)


346
347
348
# File 'lib/active_record/bogacs/default_pool.rb', line 346

def discarded? # :nodoc:
  @connections.nil?
end

#disconnect!Object

Disconnects all connections in the pool, and clears the pool.



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/active_record/bogacs/default_pool.rb', line 312

def disconnect!
  synchronize do
    @connected.make_false

    @thread_cached_conns.clear
    @connections.each do |conn|
      if conn.in_use?
        conn.steal!
        checkin conn
      end
      conn.disconnect!
    end
    @connections.clear
    @available.clear
  end
end

#flush(minimum_idle = @idle_timeout) ⇒ Object

Disconnect all connections that have been idle for at least minimum_idle seconds. Connections currently checked out, or that were checked in less than minimum_idle seconds ago, are unaffected.



480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# File 'lib/active_record/bogacs/default_pool.rb', line 480

def flush(minimum_idle = @idle_timeout)
  return if minimum_idle.nil?

  idle_connections = synchronize do
    @connections.select do |conn|
      !conn.in_use? && conn.seconds_idle >= minimum_idle
    end.each do |conn|
      conn.lease

      @available.delete conn
      @connections.delete conn

      @connected.value = @connections.any?
    end
  end

  idle_connections.each do |conn|
    conn.disconnect!
  end
end

#flush!Object

Disconnect all currently idle connections. Connections currently checked out are unaffected.



503
504
505
506
# File 'lib/active_record/bogacs/default_pool.rb', line 503

def flush!
  reap
  flush(-1)
end

#loggerObject

@@logger = nil



540
# File 'lib/active_record/bogacs/default_pool.rb', line 540

def logger; ::ActiveRecord::Base.logger end

#reapObject

Recover lost connections for the pool. A lost connection can occur if a caller forgets to #checkin a connection for a given thread when its done or a thread dies unexpectedly.



456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
# File 'lib/active_record/bogacs/default_pool.rb', line 456

def reap
  stale_connections = synchronize do
    @connections.select do |conn|
      conn.in_use? && !conn.owner.alive?
    end.each do |conn|
      conn.steal!
    end
  end

  stale_connections.each do |conn|
    synchronize do
      if conn.active?
        conn.reset!
        checkin conn
      else
        remove conn
      end
    end
  end
end

#reaper?Boolean

NOTE: active? and reset! are >= AR 2.3

Returns:

  • (Boolean)


533
# File 'lib/active_record/bogacs/default_pool.rb', line 533

def reaper?; (@reaper ||= nil) && @reaper.frequency end

#reaping?Boolean

Returns:

  • (Boolean)


534
# File 'lib/active_record/bogacs/default_pool.rb', line 534

def reaping?; reaper? && @reaper.running? end

#release_connection(owner_thread = Thread.current) ⇒ Object

Signal that the thread is finished with the current connection. #release_connection releases the connection-thread association and returns the connection to the pool.



268
269
270
271
# File 'lib/active_record/bogacs/default_pool.rb', line 268

def release_connection(owner_thread = Thread.current)
  conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
  checkin conn if conn
end

#remove(conn) ⇒ ActiveRecord::ConnectionAdapters::AbstractAdapter

Remove a connection from the connection pool. The returned connection will remain open and active but will no longer be managed by this pool.

Returns:

  • (ActiveRecord::ConnectionAdapters::AbstractAdapter)


431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# File 'lib/active_record/bogacs/default_pool.rb', line 431

def remove(conn)
  needs_new_connection = false

  synchronize do
    remove_connection_from_thread_cache conn

    @connections.delete conn
    @available.delete conn

    @connected.value = @connections.any?

    needs_new_connection = @available.any_waiting?
  end

  # This is intentionally done outside of the synchronized section as we
  # would like not to hold the main mutex while checking out new connections.
  # Thus there is some chance that needs_new_connection information is now
  # stale, we can live with that (bulk_make_new_connections will make
  # sure not to exceed the pool's @size limit).
  bulk_make_new_connections(1) if needs_new_connection
end

#statObject

Return connection pool’s usage statistic Example:

ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }


517
518
519
520
521
522
523
524
525
526
527
528
529
# File 'lib/active_record/bogacs/default_pool.rb', line 517

def stat
  synchronize do
    {
      size: size,
      connections: @connections.size,
      busy: @connections.count { |c| c.in_use? && c.owner.alive? },
      dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
      idle: @connections.count { |c| !c.in_use? },
      waiting: num_waiting_in_queue,
      checkout_timeout: checkout_timeout
    }
  end
end

#validating?Boolean

Returns:

  • (Boolean)


537
# File 'lib/active_record/bogacs/default_pool.rb', line 537

def validating?; validator? && @validator.running? end

#validator?Boolean

Returns:

  • (Boolean)


536
# File 'lib/active_record/bogacs/default_pool.rb', line 536

def validator?; (@validator ||= nil) && @validator.frequency end

#verify_active_connections!Object

Verify active connections and remove and disconnect connections associated with stale threads.



373
374
375
376
377
378
379
380
# File 'lib/active_record/bogacs/default_pool.rb', line 373

def verify_active_connections!
  synchronize do
    clear_stale_cached_connections!
    @connections.each do |connection|
      connection.verify!
    end
  end
end

#with_connection {|ActiveRecord::ConnectionAdapters::AbstractAdapter| ... } ⇒ Object

If a connection already exists yield it to the block. If no connection exists checkout a connection, yield it to the block, and checkin the connection when finished.

Yields:

  • (ActiveRecord::ConnectionAdapters::AbstractAdapter)


278
279
280
281
282
283
284
285
286
287
# File 'lib/active_record/bogacs/default_pool.rb', line 278

def with_connection
  connection_id = connection_cache_key
  unless conn = @thread_cached_conns[connection_id]
    conn = connection
    fresh_connection = true
  end
  yield conn
ensure
  release_connection if fresh_connection
end