Class: ActiveRecord::Bogacs::ShareablePool
- Inherits:
-
ConnectionAdapters::ConnectionPool
- Object
- ConnectionAdapters::ConnectionPool
- ActiveRecord::Bogacs::ShareablePool
- Includes:
- PoolSupport, ThreadSafe::CheapLockable
- Defined in:
- lib/active_record/bogacs/shareable_pool.rb
Overview
NOTE: maybe do not override?!
Constant Summary collapse
- AtomicReference =
ThreadSafe::AtomicReference
- DEFAULT_SHARED_POOL =
only allow 25% of the pool size to be shared
0.25
- MAX_THREAD_SHARING =
not really a strict limit but should hold
5
Instance Attribute Summary collapse
-
#shared_size ⇒ Object
readonly
Returns the value of attribute shared_size.
Instance Method Summary collapse
- #active_connection? ⇒ Boolean
- #clear_reloadable_connections! ⇒ Object
- #connection ⇒ Object
- #disconnect! ⇒ Object
-
#initialize(spec) ⇒ ShareablePool
constructor
A new instance of ShareablePool.
- #release_connection(with_id = current_connection_id) ⇒ Object
-
#release_shared_connection(connection) ⇒ Object
Custom API :.
- #remove(conn) ⇒ Object
- #super_active_connection?(connection_id = current_connection_id) ⇒ Boolean
- #with_shared_connection ⇒ Object
Methods included from PoolSupport
#current_connection_id, #new_connection, #reap
Constructor Details
#initialize(spec) ⇒ ShareablePool
Returns a new instance of ShareablePool.
34 35 36 37 38 39 40 41 42 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 34 def initialize(spec) super(spec) # ConnectionPool#initialize shared_size = spec.config[:shared_pool] shared_size = shared_size ? shared_size.to_f : DEFAULT_SHARED_POOL # size 0.0 - 1.0 assumes percentage of the pool size shared_size = ( @size * shared_size ).round if shared_size <= 1.0 @shared_size = shared_size.to_i @shared_connections = ThreadSafe::Map.new # :initial_capacity => @shared_size, :concurrency_level => 20 end |
Instance Attribute Details
#shared_size ⇒ Object (readonly)
Returns the value of attribute shared_size.
31 32 33 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 31 def shared_size @shared_size end |
Instance Method Details
#active_connection? ⇒ Boolean
52 53 54 55 56 57 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 52 def active_connection? if shared_conn = Thread.current[shared_connection_key] return shared_conn.in_use? end super_active_connection? current_connection_id end |
#clear_reloadable_connections! ⇒ Object
82 83 84 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 82 def clear_reloadable_connections! cheap_synchronize { @shared_connections.clear; super } end |
#connection ⇒ Object
45 46 47 48 49 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 45 def connection Thread.current[shared_connection_key] || begin # super - simplified : super # @reserved_connections.compute_if_absent(current_connection_id) { checkout } end end |
#disconnect! ⇒ Object
77 78 79 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 77 def disconnect! cheap_synchronize { @shared_connections.clear; super } end |
#release_connection(with_id = current_connection_id) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 60 def release_connection(with_id = current_connection_id) if reserved_conn = @reserved_connections[with_id] if shared_count = @shared_connections[reserved_conn] cheap_synchronize do # lock due #get_shared_connection ... not needed ?! # NOTE: the other option is to not care about shared here at all ... if shared_count.get == 0 # releasing a shared connection release_shared_connection(reserved_conn) #else return false end end else # check back-in non-shared connections checkin reserved_conn # (what super does) end end end |
#release_shared_connection(connection) ⇒ Object
Custom API :
108 109 110 111 112 113 114 115 116 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 108 def release_shared_connection(connection) shared_conn_key = shared_connection_key if connection == Thread.current[shared_conn_key] Thread.current[shared_conn_key] = nil end @shared_connections.delete(connection) checkin connection end |
#remove(conn) ⇒ Object
Note:
called from #reap thus the pool should work with reaper
88 89 90 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 88 def remove(conn) cheap_synchronize { @shared_connections.delete(conn); super } end |
#super_active_connection?(connection_id = current_connection_id) ⇒ Boolean
166 167 168 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 166 def super_active_connection?(connection_id = current_connection_id) (@reserved_connections.get(connection_id) || ( return false )).in_use? end |
#with_shared_connection ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/active_record/bogacs/shareable_pool.rb', line 118 def with_shared_connection shared_conn_key = shared_connection_key # with_shared_connection call nested in the same thread if connection = Thread.current[shared_conn_key] emulated_checkout(connection) return yield connection end start = Time.now if DEBUG begin connection_id = current_connection_id # if there's a 'regular' connection on the thread use it as super if super_active_connection?(connection_id) # for current thread connection = self.connection # do not mark as shared DEBUG && debug("with_shared_conn 10 got active = #{connection.to_s}") # otherwise if we have a shared connection - use that one : elsif connection = get_shared_connection emulated_checkout(connection); shared = true DEBUG && debug("with_shared_conn 20 got shared = #{connection.to_s}") else cheap_synchronize do # check shared again as/if threads end up sync-ing up here : if connection = get_shared_connection emulated_checkout(connection) DEBUG && debug("with_shared_conn 21 got shared = #{connection.to_s}") end # here we acquire but a connection from the pool # TODO the bottle-neck for concurrency doing sync { checkout } : unless connection # here we acquire a connection from the pool connection = self.checkout # might block if pool fully used add_shared_connection(connection) DEBUG && debug("with_shared_conn 30 acq shared = #{connection.to_s}") end end shared = true end Thread.current[shared_conn_key] = connection if shared DEBUG && debug("with_shared_conn obtaining a connection took #{(Time.now - start) * 1000}ms") yield connection ensure Thread.current[shared_conn_key] = nil # if shared rem_shared_connection(connection) if shared end end |