Class: ZK::Pool::Bounded
Overview
like a Simple pool but has high/low watermarks, and can grow dynamically as needed
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_OPTIONS =
{ :timeout => 10, :min_clients => 1, :max_clients => 10, }.freeze
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
- #add_connection! ⇒ Object protected
-
#available_size ⇒ Object
clients available for checkout (at time of call).
- #checkin(connection) ⇒ Object
- #checkout(blocking = true) ⇒ Object
-
#count_waiters ⇒ Object
number of threads waiting for connections.
- #create_connection ⇒ Object protected
- #handle_checkin_on_connection(cnx) ⇒ Object protected
-
#initialize(host, opts = {}) ⇒ Bounded
constructor
opts: *
:timeout
: connection establishement timeout *:min_clients
: how many clients should be start out with *:max_clients
: the maximum number of clients we will create in response to demand. - #populate_pool!(num_cnx) ⇒ Object protected
-
#size ⇒ Object
returns the current number of allocated clients in the pool (not available clients).
Methods inherited from Base
#assert_open!, #close_all!, #closed?, #closing?, #force_close!, #forced?, #locker, #method_missing, #open?, #pool_state, #synchronize, #with_connection, #with_lock
Constructor Details
#initialize(host, opts = {}) ⇒ Bounded
opts:
-
:timeout
: connection establishement timeout -
:min_clients
: how many clients should be start out with -
:max_clients
: the maximum number of clients we will create in response to demand
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/z_k/pool.rb', line 138 def initialize(host, opts={}) super() @host = host @connection_args = opts opts = DEFAULT_OPTIONS.merge(opts) @min_clients = Integer(opts.delete(:min_clients)) @max_clients = Integer(opts.delete(:max_clients)) @connection_timeout = opts.delete(:timeout) @count_waiters = 0 @mutex.synchronize do populate_pool!(@min_clients) @state = :open end end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method in the class ZK::Pool::Base
Instance Method Details
#add_connection! ⇒ Object (protected)
228 229 230 231 232 233 234 235 |
# File 'lib/z_k/pool.rb', line 228 def add_connection! @mutex.synchronize do cnx = create_connection @connections << cnx handle_checkin_on_connection(cnx) end # synchronize end |
#available_size ⇒ Object
clients available for checkout (at time of call)
164 165 166 |
# File 'lib/z_k/pool.rb', line 164 def available_size @mutex.synchronize { @pool.length } end |
#checkin(connection) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/z_k/pool.rb', line 168 def checkin(connection) @mutex.synchronize do if @pool.include?(connection) logger.debug { "Pool already contains connection: #{connection.object_id}, @connections.include? #{@connections.include?(connection).inspect}" } return end @pool << connection @checkin_cond.signal end end |
#checkout(blocking = true) ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/z_k/pool.rb', line 186 def checkout(blocking=true) raise ArgumentError, "checkout does not take a block, use .with_connection" if block_given? @mutex.synchronize do while true assert_open! if @pool.length > 0 cnx = @pool.shift # if the connection isn't connected, then set up an on_connection # handler and try the next one in the pool unless cnx.connected? logger.debug { "connection #{cnx.object_id} is not connected" } handle_checkin_on_connection(cnx) next end # otherwise we return the cnx return cnx elsif can_grow_pool? add_connection! next elsif blocking @checkin_cond.wait_while { @pool.empty? and open? } next else return false end end # while end end |
#count_waiters ⇒ Object
number of threads waiting for connections
182 183 184 |
# File 'lib/z_k/pool.rb', line 182 def count_waiters #:nodoc: @mutex.synchronize { @count_waiters } end |
#create_connection ⇒ Object (protected)
267 268 269 |
# File 'lib/z_k/pool.rb', line 267 def create_connection ZK.new(@host, @connection_timeout, @connection_args) end |
#handle_checkin_on_connection(cnx) ⇒ Object (protected)
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/z_k/pool.rb', line 237 def handle_checkin_on_connection(cnx) @mutex.synchronize do do_checkin = lambda do checkin(cnx) end if cnx.connected? do_checkin.call return else @on_connected_subs.synchronize do sub = cnx.on_connected do # this synchronization is to prevent a race between setting up the subscription # and assigning it to the @on_connected_subs hash. It's possible that the callback # would fire before we had a chance to add the sub to the hash. @on_connected_subs.synchronize do if sub = @on_connected_subs.delete(cnx) sub.unsubscribe do_checkin.call end end end @on_connected_subs[cnx] = sub end end end end |
#populate_pool!(num_cnx) ⇒ Object (protected)
224 225 226 |
# File 'lib/z_k/pool.rb', line 224 def populate_pool!(num_cnx) num_cnx.times { add_connection! } end |
#size ⇒ Object
returns the current number of allocated clients in the pool (not available clients)
159 160 161 |
# File 'lib/z_k/pool.rb', line 159 def size @mutex.synchronize { @connections.length } end |