Class: GenePool

Inherits:
Object
  • Object
show all
Defined in:
lib/gene_pool.rb

Overview

Generic connection pool class

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}, &connect_block) ⇒ GenePool

Creates a gene_pool. The passed block will be used to initialize a single instance of the item being pooled (i.e., socket connection or whatever) options -

name          - The name used in logging messages.
pool_size     - The maximum number of instances that will be created (Defaults to 1).
timeout       - Will raise a Timeout exception if waiting on a connection for this many seconds.
timeout_class - Exception class to raise if timeout error, defaults to Timeout::Error
warn_timeout  - Displays an error message if a checkout takes longer that the given time (used to give hints to increase the pool size).
idle_timeout  - If set, the connection will be renewed if it hasn't been used in this amount of time (seconds).
logger        - The logger used for log messages, defaults to STDERR.
close_proc    - The process or method used to close a pooled instance when it is removed.
  Defaults to :close.  Set to nil for no-op or a symbol for a method or a proc that takes an argument for the instance.


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/gene_pool.rb', line 24

def initialize(options={}, &connect_block)
  @connect_block = connect_block

  @name          = options[:name]          || 'GenePool'
  @pool_size     = options[:pool_size]     || 1
  @timeout       = options[:timeout]
  @timeout_class = options[:timeout_class] || Timeout::Error
  @warn_timeout  = options[:warn_timeout]  || 5.0
  @idle_timeout  = options[:idle_timeout]
  @logger        = options[:logger]
  @close_proc    = options[:close_proc]    || (!options.has_key?(:close_proc) && :close)

  unless @logger
    @logger = Logger.new(STDERR)
    @logger.level = Logger::INFO
  end

  @connections = []
  @checked_out = []
  # Map the original connections object_id within the with_connection method to the final connection.
  # This could change if the connection is renew'ed.
  @with_map    = Concurrent::Hash.new

  setup_mutex
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



9
10
11
# File 'lib/gene_pool.rb', line 9

def logger
  @logger
end

#nameObject

Returns the value of attribute name.



9
10
11
# File 'lib/gene_pool.rb', line 9

def name
  @name
end

#pool_sizeObject

Returns the value of attribute pool_size.



10
11
12
# File 'lib/gene_pool.rb', line 10

def pool_size
  @pool_size
end

#timeout_classObject

Returns the value of attribute timeout_class.



9
10
11
# File 'lib/gene_pool.rb', line 9

def timeout_class
  @timeout_class
end

#warn_timeoutObject

Returns the value of attribute warn_timeout.



9
10
11
# File 'lib/gene_pool.rb', line 9

def warn_timeout
  @warn_timeout
end

Instance Method Details

#checkin(connection) ⇒ Object

Return a connection to the pool.



112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/gene_pool.rb', line 112

def checkin(connection)
  @mutex.synchronize do
    @checked_out.delete(connection)
    if @pool_size < @connections.size
      remove_and_close(connection)
      @logger.info "#{@name}: Checkin connection #{connection}(#{connection.object_id}) has been removed due to pool size reduction"
    else
      connection._last_used = Time.now
      @condition.signal
    end
  end
  @logger.debug {"#{@name}: Checkin connection #{connection}(#{connection.object_id}) self=#{self}"}
end

#checkoutObject

Check out a connection from the pool, creating it if necessary.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/gene_pool.rb', line 71

def checkout
  start_time = Time.now
  connection = nil
  reserved_connection_placeholder = Thread.current
  begin
    @mutex.synchronize do
      raise "Can't perform checkout, #{@name} has been closed" if @pool_size == 0
      until connection do
        if @checked_out.size < @connections.size
          connection = (@connections - @checked_out).first
          @checked_out << connection
        elsif @connections.size < @pool_size
          # Perform the actual connection outside the mutex
          connection = reserved_connection_placeholder
          @connections << connection
          @checked_out << connection
          @logger.debug {"#{@name}: Created connection ##{@connections.size} #{connection}(#{connection.object_id}) for #{name}"}
        else
          @logger.info "#{@name}: Waiting for an available connection, all #{@pool_size} connections are checked out."
          wait_mutex(start_time)
        end
      end
    end
  ensure
    delta = Time.now - start_time
    if delta > @warn_timeout
      @logger.warn "#{@name}: It took #{delta} seconds to obtain a connection.  Consider raising the pool size which is " +
        "currently set to #{@pool_size}."
    end
  end
  if connection == reserved_connection_placeholder
    connection = renew(reserved_connection_placeholder)
  elsif @idle_timeout && (Time.now - connection._last_used) >= @idle_timeout
    connection = renew(connection)
  end

  @logger.debug {"#{@name}: Checkout connection #{connection}(#{connection.object_id}) self=#{self}"}
  return connection
end

#close(timeout = 10) ⇒ Object

Close all connections and wait for active connections to complete

Parameters:

timeout:
  Maximum time to wait for connections to close before returning


246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/gene_pool.rb', line 246

def close(timeout=10)
  # Prevent any new connections from being handed out
  self.pool_size = 0
  start_time = Time.now
  while (Time.now - start_time) < timeout
    sleep 1
    @mutex.synchronize do
      return if @connections.empty?
      @logger.info "#{@name}: Waiting to close, #{@connections.size} connections are still in use"
    end
  end
  @logger.warn "#{@name}: Timed out while waiting to close, #{@connections.size} connections are still in use"
end

#connectionsObject

Return a copy of all the current connections



234
235
236
237
238
239
# File 'lib/gene_pool.rb', line 234

def connections
  connections = @mutex.synchronize { connections = @connections.dup }
  connections.delete_if { |c| c.kind_of?(Thread) }
  connections.freeze
  connections
end

#eachObject

Perform the given block for each connection. Note that close should be used for safely closing all connections This should probably only ever be used to allow interrupt of a connection that is checked out?



226
227
228
229
230
231
# File 'lib/gene_pool.rb', line 226

def each
  @mutex.synchronize do
    # Don't include the ones in a reserved_placeholder state because that object is meaningless
    @connections.each { |connection| yield connection unless connection.kind_of?(Thread) }
  end
end

#remove(connection) ⇒ Object

Remove an existing connection from the pool



180
181
182
183
184
185
186
187
188
# File 'lib/gene_pool.rb', line 180

def remove(connection)
  @mutex.synchronize do
    @connections.delete(connection)
    @checked_out.delete(connection)
    @condition.signal
  end
  close_connection(connection)
  @logger.debug {"#{@name}: Removed connection #{connection}(#{connection.object_id}) self=#{self}"}
end

#remove_idle(idle_time = 60) ⇒ Object



260
261
262
263
264
265
266
267
268
269
# File 'lib/gene_pool.rb', line 260

def remove_idle(idle_time=60)
  @mutex.synchronize do
    (@connections - @checked_out).each do |idle_connection|
      if (Time.now - idle_connection._last_used) >= idle_time
        remove_and_close(idle_connection)
        @logger.debug {"#{@name}: Removed idle connection=#{idle_connection}(#{idle_connection.object_id})"}
      end
    end
  end
end

#renew(old_connection) ⇒ Object

If a connection needs to be renewed for some reason, reassign it here



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/gene_pool.rb', line 191

def renew(old_connection)
  new_connection =
    begin
      @connect_block.call
    rescue Exception
      remove old_connection
      raise
    end
  class << new_connection
    attr_accessor :_last_used
  end
  @mutex.synchronize do
    index = @checked_out.index(old_connection)
    raise Error.new("Can't reassign non-checked out connection for #{@name}") unless index
    @checked_out[index] = new_connection
    @connections[@connections.index(old_connection)] = new_connection
    # If this is part of a with_connection block, then track our new connection
    if @with_map.respond_to?(:key)
      with_key = @with_map.key(old_connection)
    else
      # 1.8 compatibility
      with_key = @with_map.index(old_connection)
    end

    @with_map[with_key] = new_connection if with_key
  end
  # Since connection has been removed, it can be closed outside the mutex
  close_connection(old_connection)

  @logger.debug {"#{@name}: Renewed connection old=#{old_connection.inspect} new=#{new_connection.inspect}"}
  return new_connection
end

#sizeObject



50
51
52
53
54
# File 'lib/gene_pool.rb', line 50

def size
  @mutex.synchronize do
    return @connections.size
  end
end

#to_sObject



271
272
273
274
275
276
277
278
279
# File 'lib/gene_pool.rb', line 271

def to_s
  conn = chk = with = nil
  @mutex.synchronize do
    conn = @connections.map{|c| c.object_id}.join(',')
    chk  = @checked_out.map{|c| c.object_id}.join(',')
    with = @with_map.keys.map{|k| "#{k}=#{@with_map[k].object_id}"}.join(',')
  end
  "connections=#{conn} checked_out=#{chk} with_map=#{with}"
end

#with_connectionObject

Create a scope for checking out a connection The client should handle cleanup on exception which should be something similar to the following:

rescue Exception => e
  @gene_pool.remove(connection)
  raise
end

Note that with_connection_auto_remove automatically does this



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/gene_pool.rb', line 133

def with_connection
  connection = checkout
  @with_map[connection.object_id] = connection
  begin
    yield connection
  ensure
    # Update connection for any renew's that have occurred
    connection = @with_map.delete(connection.object_id)
    checkin(connection) if connection
  end
end

#with_connection_auto_removeObject

Create a scope for checking out a connection while automatically removing this connection on exception



146
147
148
149
150
151
152
153
154
155
# File 'lib/gene_pool.rb', line 146

def with_connection_auto_remove
  with_connection do |connection|
    begin
      yield connection
    rescue Exception
      remove(connection)
      raise
    end
  end
end

#with_connection_auto_retryObject

Create a scope for checking out a connection while automatically retrying on exception



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/gene_pool.rb', line 159

def with_connection_auto_retry
  with_connection do |connection|
    begin
      yield connection
    rescue Exception => e
      if e.kind_of?(Timeout::Error) || e.kind_of?(@timeout_class) || e.message =~ /expired/
        remove(connection)
        raise
      end
      connection = renew(connection)
      begin
        yield connection
      rescue Exception => e
        remove(connection)
        raise
      end
    end
  end
end