Class: ActiveRecord::ConnectionAdapters::ConnectionPool
- Inherits:
-
Object
- Object
- ActiveRecord::ConnectionAdapters::ConnectionPool
- Defined in:
- lib/active_record/fiber_patches.rb,
lib/active_record/round_robin.rb
Overview
ActiveRecord’s connection pool is based on threads. Since we are working with EM and a single thread, multiple fiber design, we need to provide our own connection pool that keys off of Fiber.current so that different fibers running in the same thread don’t try to use the same connection.
Direct Known Subclasses
Defined Under Namespace
Modules: RoundRobin
Instance Method Summary collapse
- #clear_stale_cached_connections! ⇒ Object
- #connection_with_sanity_check ⇒ Object
-
#initialize(spec) ⇒ ConnectionPool
constructor
A new instance of ConnectionPool.
- #valid_busy_fiber? ⇒ Boolean
Constructor Details
#initialize(spec) ⇒ ConnectionPool
Returns a new instance of ConnectionPool.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/active_record/fiber_patches.rb', line 56 def initialize(spec) @spec = spec # The cache of reserved connections mapped to threads @reserved_connections = {} # The mutex used to synchronize pool access @connection_mutex = FiberedMonitor.new @queue = @connection_mutex.new_cond @queue.name = @name || 'database' # default 5 second timeout unless on ruby 1.9 @timeout = spec.config[:wait_timeout] || 5 # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 @connections = [] @checked_out = [] @automatic_reconnect = true @tables = {} @columns = Hash.new do |h, table_name| h[table_name] = with_connection do |conn| # Fetch a list of columns conn.columns(table_name, "#{table_name} Columns").tap do |columns| # set primary key information columns.each do |column| column.primary = column.name == primary_keys[table_name] end end end end @columns_hash = Hash.new do |h, table_name| h[table_name] = Hash[columns[table_name].map { |col| [col.name, col] }] end @primary_keys = Hash.new do |h, table_name| h[table_name] = with_connection do |conn| table_exists?(table_name) ? conn.primary_key(table_name) : 'id' end end end |
Instance Method Details
#clear_stale_cached_connections! ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/active_record/fiber_patches.rb', line 105 def clear_stale_cached_connections! cache = @reserved_connections keys = Set.new(cache.keys) ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool| pool.busy_fibers.each_pair do |object_id, fiber| keys.delete(object_id) end end keys.each do |key| next unless cache.has_key?(key) checkin cache[key] cache.delete(key) end end |
#connection_with_sanity_check ⇒ Object
128 129 130 131 |
# File 'lib/active_record/fiber_patches.rb', line 128 def connection_with_sanity_check raise "not a busy fiber" if EM.reactor_running? && ! valid_busy_fiber? connection_without_sanity_check end |
#valid_busy_fiber? ⇒ Boolean
122 123 124 125 126 |
# File 'lib/active_record/fiber_patches.rb', line 122 def valid_busy_fiber? ActiveRecord::ConnectionAdapters.fiber_pools.any? do |pool| pool.busy_fibers[current_connection_id] end end |