Class: ZK::Pool::Bounded

Inherits:
Base
  • Object
show all
Defined in:
lib/z_k/pool.rb

Overview

like a Simple pool but has high/low watermarks, and can grow dynamically as needed

Direct Known Subclasses

Simple

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :timeout      => 10,
  :min_clients  => 1,
  :max_clients  => 10,
}.freeze

Instance Attribute Summary

Attributes inherited from Base

#connections

Instance Method Summary collapse

Methods inherited from Base

#assert_open!, #close_all!, #closed?, #closing?, #force_close!, #forced?, #locker, #method_missing, #open?, #pool_state, #synchronize, #with_connection, #with_lock

Constructor Details

#initialize(host, opts = {}) ⇒ Bounded

opts:

  • :timeout: connection establishement timeout

  • :min_clients: how many clients should be start out with

  • :max_clients: the maximum number of clients we will create in response to demand



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/z_k/pool.rb', line 138

def initialize(host, opts={})
  super()
  @host = host
  @connection_args = opts

  opts = DEFAULT_OPTIONS.merge(opts)

  @min_clients = Integer(opts.delete(:min_clients))
  @max_clients = Integer(opts.delete(:max_clients))
  @connection_timeout = opts.delete(:timeout)

  @count_waiters = 0

  @mutex.synchronize do
    populate_pool!(@min_clients)
    @state = :open
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class ZK::Pool::Base

Instance Method Details

#add_connection!Object (protected)



228
229
230
231
232
233
234
235
# File 'lib/z_k/pool.rb', line 228

def add_connection!
  @mutex.synchronize do
    cnx = create_connection
    @connections << cnx 

    handle_checkin_on_connection(cnx)
  end # synchronize
end

#available_sizeObject

clients available for checkout (at time of call)



164
165
166
# File 'lib/z_k/pool.rb', line 164

def available_size
  @mutex.synchronize { @pool.length }
end

#checkin(connection) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/z_k/pool.rb', line 168

def checkin(connection)
  @mutex.synchronize do
    if @pool.include?(connection)
      logger.debug { "Pool already contains connection: #{connection.object_id}, @connections.include? #{@connections.include?(connection).inspect}" }
      return
    end

    @pool << connection

    @checkin_cond.signal
  end
end

#checkout(blocking = true) ⇒ Object

Raises:

  • (ArgumentError)


186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/z_k/pool.rb', line 186

def checkout(blocking=true) 
  raise ArgumentError, "checkout does not take a block, use .with_connection" if block_given?
  @mutex.synchronize do
    while true
      assert_open!

      if @pool.length > 0
        cnx = @pool.shift
        
        # if the connection isn't connected, then set up an on_connection
        # handler and try the next one in the pool
        unless cnx.connected?
          logger.debug { "connection #{cnx.object_id} is not connected" }
          handle_checkin_on_connection(cnx)
          next
        end

        # otherwise we return the cnx
        return cnx
      elsif can_grow_pool?
        add_connection!
        next
      elsif blocking
        @checkin_cond.wait_while { @pool.empty? and open? }
        next
      else
        return false
      end
    end # while 
  end
end

#count_waitersObject

number of threads waiting for connections



182
183
184
# File 'lib/z_k/pool.rb', line 182

def count_waiters #:nodoc:
  @mutex.synchronize { @count_waiters }
end

#create_connectionObject (protected)



267
268
269
# File 'lib/z_k/pool.rb', line 267

def create_connection
  ZK.new(@host, @connection_timeout, @connection_args)
end

#handle_checkin_on_connection(cnx) ⇒ Object (protected)



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/z_k/pool.rb', line 237

def handle_checkin_on_connection(cnx)
  @mutex.synchronize do
    do_checkin = lambda do
      checkin(cnx)
    end

    if cnx.connected?
      do_checkin.call
      return
    else
      @on_connected_subs.synchronize do

        sub = cnx.on_connected do 
          # this synchronization is to prevent a race between setting up the subscription
          # and assigning it to the @on_connected_subs hash. It's possible that the callback
          # would fire before we had a chance to add the sub to the hash.
          @on_connected_subs.synchronize do
            if sub = @on_connected_subs.delete(cnx)
              sub.unsubscribe
              do_checkin.call
            end
          end
        end

        @on_connected_subs[cnx] = sub
      end
    end
  end
end

#populate_pool!(num_cnx) ⇒ Object (protected)



224
225
226
# File 'lib/z_k/pool.rb', line 224

def populate_pool!(num_cnx)
  num_cnx.times { add_connection! }
end

#sizeObject

returns the current number of allocated clients in the pool (not available clients)



159
160
161
# File 'lib/z_k/pool.rb', line 159

def size
  @mutex.synchronize { @connections.length }
end