Class: ConnectionPool
- Inherits:
-
Object
- Object
- ConnectionPool
- Includes:
- ForkTracker
- Defined in:
- lib/connection_pool.rb,
lib/connection_pool.rb,
lib/connection_pool/fork.rb,
lib/connection_pool/version.rb,
lib/connection_pool/wrapper.rb
Overview
Generic connection pool class for sharing a limited number of objects or network connections among many threads. Note: pool elements are lazily created.
Example usage with block (faster):
@pool = ConnectionPool.new { Redis.new }
@pool.with do |redis|
redis.lpop('my-list') if redis.llen('my-list') > 0
end
Using optional timeout override (for that single invocation)
@pool.with(timeout: 2.0) do |redis|
redis.lpop('my-list') if redis.llen('my-list') > 0
end
Example usage replacing an existing connection (slower):
$redis = ConnectionPool.wrap { Redis.new }
def do_work
$redis.lpop('my-list') if $redis.llen('my-list') > 0
end
Accepts the following options:
-
:size - number of connections to pool, defaults to 5
-
:timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds
-
:auto_reload_after_fork - automatically drop all connections after fork, defaults to true
Defined Under Namespace
Modules: ForkTracker Classes: Error, PoolShuttingDownError, TimedStack, TimeoutError, Wrapper
Constant Summary collapse
- VERSION =
"3.0.2"
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Class Method Summary collapse
Instance Method Summary collapse
-
#available ⇒ Object
Number of pool entries available for checkout at this instant.
- #checkin(force: false) ⇒ Object
- #checkout(timeout: @timeout) ⇒ Object
-
#discard_current_connection {|conn| ... } ⇒ void
Marks the current thread’s checked-out connection for discard.
-
#idle ⇒ Object
Number of pool entries created and idle in the pool.
-
#initialize(timeout: 5, size: 5, auto_reload_after_fork: true, name: nil) ⇒ ConnectionPool
constructor
A new instance of ConnectionPool.
-
#reap(idle_seconds: 60) ⇒ Object
Reaps idle connections that have been idle for over
idle_seconds. -
#reload ⇒ Object
Reloads the ConnectionPool by passing each connection to
blockand then removing it the pool. -
#shutdown ⇒ Object
Shuts down the ConnectionPool by passing each connection to
blockand then removing it from the pool. - #with ⇒ Object (also: #then)
Methods included from ForkTracker
Constructor Details
#initialize(timeout: 5, size: 5, auto_reload_after_fork: true, name: nil) ⇒ ConnectionPool
Returns a new instance of ConnectionPool.
48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/connection_pool.rb', line 48 def initialize(timeout: 5, size: 5, auto_reload_after_fork: true, name: nil, &) raise ArgumentError, "Connection pool requires a block" unless block_given? @size = Integer(size) @timeout = Float(timeout) @available = TimedStack.new(size: @size, &) @key = :"pool-#{@available.object_id}" @key_count = :"pool-#{@available.object_id}-count" @discard_key = :"pool-#{@available.object_id}-discard" INSTANCES[self] = self if auto_reload_after_fork && INSTANCES end |
Instance Attribute Details
#size ⇒ Object (readonly)
Returns the value of attribute size.
46 47 48 |
# File 'lib/connection_pool.rb', line 46 def size @size end |
Class Method Details
.after_fork ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/connection_pool/fork.rb', line 6 def self.after_fork INSTANCES.each_value do |pool| # We're in after_fork, so we know all other threads are dead. # All we need to do is ensure the main thread doesn't have a # checked out connection pool.checkin(force: true) pool.reload do |connection| # Unfortunately we don't know what method to call to close the connection, # so we try the most common one. connection.close if connection.respond_to?(:close) end end nil end |
.wrap ⇒ Object
42 43 44 |
# File 'lib/connection_pool.rb', line 42 def self.wrap(**, &) Wrapper.new(**, &) end |
Instance Method Details
#available ⇒ Object
Number of pool entries available for checkout at this instant.
173 174 175 |
# File 'lib/connection_pool.rb', line 173 def available @available.length end |
#checkin(force: false) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/connection_pool.rb', line 123 def checkin(force: false) if ::Thread.current[@key] if ::Thread.current[@key_count] == 1 || force if ::Thread.current[@discard_key] begin @available.decrement_created ::Thread.current[@discard_key].call(::Thread.current[@key]) rescue nil ensure ::Thread.current[@discard_key] = nil end else @available.push(::Thread.current[@key]) end ::Thread.current[@key] = nil ::Thread.current[@key_count] = nil else ::Thread.current[@key_count] -= 1 end elsif !force raise ConnectionPool::Error, "no connections are checked out" end nil end |
#checkout(timeout: @timeout) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/connection_pool.rb', line 111 def checkout(timeout: @timeout, **) if ::Thread.current[@key] ::Thread.current[@key_count] += 1 ::Thread.current[@key] else conn = @available.pop(timeout:, **) ::Thread.current[@key] = conn ::Thread.current[@key_count] = 1 conn end end |
#discard_current_connection {|conn| ... } ⇒ void
This method returns an undefined value.
Marks the current thread’s checked-out connection for discard.
When a connection is marked for discard, it will not be returned to the pool when checked in. Instead, the connection will be discarded. This is useful when a connection has become invalid or corrupted and should not be reused.
Takes an optional block that will be called with the connection to be discarded. The block should perform any necessary clean-up on the connection.
Note: This only affects the connection currently checked out by the calling thread. The connection will be discarded when checkin is called.
107 108 109 |
# File 'lib/connection_pool.rb', line 107 def discard_current_connection(&block) ::Thread.current[@discard_key] = block || proc { |conn| conn } end |
#idle ⇒ Object
Number of pool entries created and idle in the pool.
178 179 180 |
# File 'lib/connection_pool.rb', line 178 def idle @available.idle end |
#reap(idle_seconds: 60) ⇒ Object
Reaps idle connections that have been idle for over idle_seconds. idle_seconds defaults to 60.
168 169 170 |
# File 'lib/connection_pool.rb', line 168 def reap(idle_seconds: 60, &) @available.reap(idle_seconds:, &) end |
#reload ⇒ Object
Reloads the ConnectionPool by passing each connection to block and then removing it the pool. Subsequent checkouts will create new connections as needed.
162 163 164 |
# File 'lib/connection_pool.rb', line 162 def reload(&) @available.shutdown(reload: true, &) end |
#shutdown ⇒ Object
Shuts down the ConnectionPool by passing each connection to block and then removing it from the pool. Attempting to checkout a connection after shutdown will raise ConnectionPool::PoolShuttingDownError.
154 155 156 |
# File 'lib/connection_pool.rb', line 154 def shutdown(&) @available.shutdown(&) end |
#with ⇒ Object Also known as: then
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/connection_pool.rb', line 60 def with(**) # We need to manage exception handling manually here in order # to work correctly with `Timeout.timeout` and `Thread#raise`. # Otherwise an interrupted Thread can leak connections. Thread.handle_interrupt(Exception => :never) do conn = checkout(**) begin Thread.handle_interrupt(Exception => :immediate) do yield conn end ensure checkin end end end |