Class: ZK::Pool::Bounded

Inherits:
Base
  • Object
show all
Defined in:
lib/zk/pool.rb

Overview

like a Simple pool but has high/low watermarks, and can grow dynamically as needed

Direct Known Subclasses

Simple

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :timeout      => 10,
  :min_clients  => 1,
  :max_clients  => 10,
}.freeze

Instance Attribute Summary

Attributes inherited from Base

#connections

Instance Method Summary collapse

Methods inherited from Base

#close_all!, #closed?, #closing?, #forced?, #locker, #method_missing, #open?, #pool_state, #with_connection, #with_lock

Methods included from Logger

#logger, wrapped_logger, wrapped_logger=

Constructor Details

#initialize(host, opts = {}) ⇒ Bounded

opts:

  • :timeout: connection establishement timeout
  • :min_clients: how many clients should be start out with
  • :max_clients: the maximum number of clients we will create in response to demand


166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/zk/pool.rb', line 166

def initialize(host, opts={})
  super()
  @host = host
  @connection_args = opts

  opts = DEFAULT_OPTIONS.merge(opts)

  @min_clients = Integer(opts.delete(:min_clients))
  @max_clients = Integer(opts.delete(:max_clients))
  @connection_timeout = opts.delete(:timeout)

  @count_waiters = 0

  @mutex.synchronize do
    populate_pool!(@min_clients)
    @state = :open
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class ZK::Pool::Base

Instance Method Details

#available_sizeObject

clients available for checkout (at time of call)



192
193
194
# File 'lib/zk/pool.rb', line 192

def available_size
  @mutex.synchronize { @pool.length }
end

#checkin(connection) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/zk/pool.rb', line 196

def checkin(connection)
  @mutex.synchronize do
    if @pool.include?(connection)
      logger.debug { "Pool already contains connection: #{connection.object_id}, @connections.include? #{@connections.include?(connection).inspect}" }
      return
    end

    @pool << connection

    @checkin_cond.broadcast
  end
end

#checkout(blocking = true) ⇒ Object

Raises:

  • (ArgumentError)


214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/zk/pool.rb', line 214

def checkout(blocking=true)
  raise ArgumentError, "checkout does not take a block, use .with_connection" if block_given?
  @mutex.lock
  begin
    while true
      assert_open!

      if @pool.length > 0
        cnx = @pool.shift

        # XXX(slyphon): not really sure how this case happens, but protect against it as we're
        # seeing an issue in production
        next if cnx.nil?

        # if the connection isn't connected, then set up an on_connection
        # handler and try the next one in the pool
        unless cnx.connected?
          logger.debug { "connection #{cnx.object_id} is not connected" }
          handle_checkin_on_connection(cnx)
          next
        end

        # otherwise we return the cnx
        return cnx
      elsif can_grow_pool?
        add_connection!
        next
      elsif blocking
        @checkin_cond.wait_while { @pool.empty? and open? }
        next
      else
        return false
      end
    end # while
  ensure
    @mutex.unlock rescue nil
  end
end

#count_waitersObject

number of threads waiting for connections



210
211
212
# File 'lib/zk/pool.rb', line 210

def count_waiters #:nodoc:
  @mutex.synchronize { @count_waiters }
end

#sizeObject

returns the current number of allocated clients in the pool (not available clients)



187
188
189
# File 'lib/zk/pool.rb', line 187

def size
  @mutex.synchronize { @connections.length }
end