Class: WaterDrop::ConnectionPool
- Inherits:
-
Object
- Object
- WaterDrop::ConnectionPool
- Extended by:
- Forwardable
- Defined in:
- lib/waterdrop/connection_pool.rb
Overview
Connection pool wrapper for WaterDrop producers using the proven connection_pool gem.
This provides a clean WaterDrop-specific API while leveraging the battle-tested, connection_pool gem underneath. The wrapper hides the direct usage of the connection_pool gem and provides WaterDrop-specific configuration.
Class Attribute Summary collapse
-
.default_pool ⇒ ConnectionPool?
The global connection pool instance.
Instance Attribute Summary collapse
-
#pool ⇒ ::ConnectionPool
readonly
Returns the underlying connection_pool instance This allows access to advanced connection_pool features if needed.
Class Method Summary collapse
-
.active? ⇒ Boolean
Check if the global connection pool is active (configured).
-
.close ⇒ Object
Shutdown the global connection pool Alias for shutdown to align with producer API WaterDrop producers use #close, so we alias connection pool #shutdown to #close for API consistency across both individual producers and connection pools.
-
.reload ⇒ Object
Reload the global connection pool.
-
.setup(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool
Sets up a global connection pool.
-
.shutdown ⇒ Object
Shutdown the global connection pool.
-
.stats ⇒ Hash?
Get statistics about the global pool.
-
.transaction {|producer| ... } ⇒ Object
Execute a transaction with a producer from the global connection pool Only available when connection pool is configured.
-
.with {|producer| ... } ⇒ Object
Executes a block with a producer from the global pool.
Instance Method Summary collapse
-
#initialize(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool
constructor
Creates a new WaterDrop connection pool.
-
#reload ⇒ Object
Reload all connections in the pool Useful for configuration changes or error recovery.
-
#shutdown ⇒ Object
(also: #close)
Shutdown the connection pool.
-
#stats ⇒ Hash
Get pool statistics.
-
#transaction {|producer| ... } ⇒ Object
Execute a transaction with a producer from this connection pool.
Constructor Details
#initialize(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool
Creates a new WaterDrop connection pool
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/waterdrop/connection_pool.rb', line 201 def initialize(size: 5, timeout: 5000, &producer_config) self.class.send(:ensure_connection_pool_gem!) @producer_config = producer_config @pool_index = 0 @pool_mutex = Mutex.new @pool = ::ConnectionPool.new(size: size, timeout: timeout / 1000.0) do producer_index = @pool_mutex.synchronize { @pool_index += 1 } WaterDrop::Producer.new do |config| if @producer_config.arity == 2 @producer_config.call(config, producer_index) else @producer_config.call(config) end end end # Emit event when a connection pool is created WaterDrop.instrumentation.instrument( "connection_pool.created", pool: self, size: size, timeout: timeout ) end |
Class Attribute Details
.default_pool ⇒ ConnectionPool?
47 48 49 |
# File 'lib/waterdrop/connection_pool.rb', line 47 def default_pool @default_pool end |
Instance Attribute Details
#pool ⇒ ::ConnectionPool (readonly)
Returns the underlying connection_pool instance This allows access to advanced connection_pool features if needed
293 294 295 |
# File 'lib/waterdrop/connection_pool.rb', line 293 def pool @pool end |
Class Method Details
.active? ⇒ Boolean
Check if the global connection pool is active (configured)
151 152 153 |
# File 'lib/waterdrop/connection_pool.rb', line 151 def active? !@default_pool.nil? end |
.close ⇒ Object
Shutdown the global connection pool Alias for shutdown to align with producer API WaterDrop producers use #close, so we alias connection pool #shutdown to #close for API consistency across both individual producers and connection pools
133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/waterdrop/connection_pool.rb', line 133 def shutdown return unless @default_pool pool = @default_pool @default_pool.shutdown @default_pool = nil # Emit global event for pool shutdown WaterDrop.instrumentation.instrument( "connection_pool.shutdown", pool: pool ) end |
.reload ⇒ Object
Reload the global connection pool
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/waterdrop/connection_pool.rb', line 136 def reload return unless @default_pool @default_pool.reload # Emit global event for pool reload WaterDrop.instrumentation.instrument( "connection_pool.reload", pool: @default_pool ) end |
.setup(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool
Sets up a global connection pool
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/waterdrop/connection_pool.rb', line 71 def setup(size: 5, timeout: 5000, &producer_config) ensure_connection_pool_gem! @default_pool = new(size: size, timeout: timeout, &producer_config) # Emit global event for pool setup WaterDrop.instrumentation.instrument( "connection_pool.setup", pool: @default_pool, size: size, timeout: timeout ) @default_pool end |
.shutdown ⇒ Object
Shutdown the global connection pool
116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/waterdrop/connection_pool.rb', line 116 def shutdown return unless @default_pool pool = @default_pool @default_pool.shutdown @default_pool = nil # Emit global event for pool shutdown WaterDrop.instrumentation.instrument( "connection_pool.shutdown", pool: pool ) end |
.stats ⇒ Hash?
Get statistics about the global pool
106 107 108 109 110 111 112 113 |
# File 'lib/waterdrop/connection_pool.rb', line 106 def stats return nil unless @default_pool { size: @default_pool.size, available: @default_pool.available } end |
.transaction {|producer| ... } ⇒ Object
Execute a transaction with a producer from the global connection pool Only available when connection pool is configured
167 168 169 170 171 |
# File 'lib/waterdrop/connection_pool.rb', line 167 def transaction(...) raise "No global connection pool configured. Call setup first." unless @default_pool @default_pool.transaction(...) end |
.with {|producer| ... } ⇒ Object
Executes a block with a producer from the global pool
97 98 99 100 101 |
# File 'lib/waterdrop/connection_pool.rb', line 97 def with(...) raise "No global connection pool configured. Call setup first." unless @default_pool @default_pool.with(...) end |
Instance Method Details
#reload ⇒ Object
Reload all connections in the pool Useful for configuration changes or error recovery
259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/waterdrop/connection_pool.rb', line 259 def reload @pool.reload do |producer| producer.close! if producer&.status&.active? end # Emit event after pool is reloaded WaterDrop.instrumentation.instrument( "connection_pool.reloaded", pool: self ) end |
#shutdown ⇒ Object Also known as: close
Shutdown the connection pool
240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/waterdrop/connection_pool.rb', line 240 def shutdown @pool.shutdown do |producer| producer.close! if producer&.status&.active? end # Emit event after pool is shut down WaterDrop.instrumentation.instrument( "connection_pool.shutdown", pool: self ) end |
#stats ⇒ Hash
Get pool statistics
232 233 234 235 236 237 |
# File 'lib/waterdrop/connection_pool.rb', line 232 def stats { size: @pool.size, available: @pool.available } end |
#transaction {|producer| ... } ⇒ Object
Execute a transaction with a producer from this connection pool
281 282 283 284 285 286 287 |
# File 'lib/waterdrop/connection_pool.rb', line 281 def transaction with do |producer| producer.transaction do yield(producer) end end end |