Class: ConnectionPool

Inherits:
Object
  • Object
show all
Includes:
ForkTracker
Defined in:
lib/connection_pool.rb,
lib/connection_pool.rb,
lib/connection_pool/fork.rb,
lib/connection_pool/version.rb,
lib/connection_pool/wrapper.rb

Overview

Generic connection pool class for sharing a limited number of objects or network connections among many threads. Note: pool elements are lazily created.

Example usage with block (faster):

@pool = ConnectionPool.new { Redis.new }
@pool.with do |redis|
  redis.lpop('my-list') if redis.llen('my-list') > 0
end

Using optional timeout override (for that single invocation)

@pool.with(timeout: 2.0) do |redis|
  redis.lpop('my-list') if redis.llen('my-list') > 0
end

Example usage replacing an existing connection (slower):

$redis = ConnectionPool.wrap { Redis.new }

def do_work
  $redis.lpop('my-list') if $redis.llen('my-list') > 0
end

Accepts the following options:

  • :size - number of connections to pool, defaults to 5

  • :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds

  • :auto_reload_after_fork - automatically drop all connections after fork, defaults to true

Defined Under Namespace

Modules: ForkTracker Classes: Error, PoolShuttingDownError, TimedStack, TimeoutError, Wrapper

Constant Summary collapse

VERSION =
"3.0.2"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ForkTracker

#_fork

Constructor Details

#initialize(timeout: 5, size: 5, auto_reload_after_fork: true, name: nil) ⇒ ConnectionPool

Returns a new instance of ConnectionPool.

Raises:

  • (ArgumentError)


48
49
50
51
52
53
54
55
56
57
58
# File 'lib/connection_pool.rb', line 48

def initialize(timeout: 5, size: 5, auto_reload_after_fork: true, name: nil, &)
  raise ArgumentError, "Connection pool requires a block" unless block_given?

  @size = Integer(size)
  @timeout = Float(timeout)
  @available = TimedStack.new(size: @size, &)
  @key = :"pool-#{@available.object_id}"
  @key_count = :"pool-#{@available.object_id}-count"
  @discard_key = :"pool-#{@available.object_id}-discard"
  INSTANCES[self] = self if auto_reload_after_fork && INSTANCES
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



46
47
48
# File 'lib/connection_pool.rb', line 46

def size
  @size
end

Class Method Details

.after_forkObject



6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/connection_pool/fork.rb', line 6

def self.after_fork
  INSTANCES.each_value do |pool|
    # We're in after_fork, so we know all other threads are dead.
    # All we need to do is ensure the main thread doesn't have a
    # checked out connection
    pool.checkin(force: true)
    pool.reload do |connection|
      # Unfortunately we don't know what method to call to close the connection,
      # so we try the most common one.
      connection.close if connection.respond_to?(:close)
    end
  end
  nil
end

.wrapObject



42
43
44
# File 'lib/connection_pool.rb', line 42

def self.wrap(**, &)
  Wrapper.new(**, &)
end

Instance Method Details

#availableObject

Number of pool entries available for checkout at this instant.



173
174
175
# File 'lib/connection_pool.rb', line 173

def available
  @available.length
end

#checkin(force: false) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/connection_pool.rb', line 123

def checkin(force: false)
  if ::Thread.current[@key]
    if ::Thread.current[@key_count] == 1 || force
      if ::Thread.current[@discard_key]
        begin
          @available.decrement_created
          ::Thread.current[@discard_key].call(::Thread.current[@key])
        rescue
          nil
        ensure
          ::Thread.current[@discard_key] = nil
        end
      else
        @available.push(::Thread.current[@key])
      end
      ::Thread.current[@key] = nil
      ::Thread.current[@key_count] = nil
    else
      ::Thread.current[@key_count] -= 1
    end
  elsif !force
    raise ConnectionPool::Error, "no connections are checked out"
  end

  nil
end

#checkout(timeout: @timeout) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/connection_pool.rb', line 111

def checkout(timeout: @timeout, **)
  if ::Thread.current[@key]
    ::Thread.current[@key_count] += 1
    ::Thread.current[@key]
  else
    conn = @available.pop(timeout:, **)
    ::Thread.current[@key] = conn
    ::Thread.current[@key_count] = 1
    conn
  end
end

#discard_current_connection {|conn| ... } ⇒ void

This method returns an undefined value.

Marks the current thread’s checked-out connection for discard.

When a connection is marked for discard, it will not be returned to the pool when checked in. Instead, the connection will be discarded. This is useful when a connection has become invalid or corrupted and should not be reused.

Takes an optional block that will be called with the connection to be discarded. The block should perform any necessary clean-up on the connection.

Note: This only affects the connection currently checked out by the calling thread. The connection will be discarded when checkin is called.

Examples:

pool.with do |conn|
  begin
    conn.execute("SELECT 1")
  rescue SomeConnectionError
    pool.discard_current_connection  # Mark connection as bad
    raise
  end
end

Yields:

  • (conn)

Yield Parameters:

  • conn (Object)

    The connection to be discarded.

Yield Returns:

  • (void)


107
108
109
# File 'lib/connection_pool.rb', line 107

def discard_current_connection(&block)
  ::Thread.current[@discard_key] = block || proc { |conn| conn }
end

#idleObject

Number of pool entries created and idle in the pool.



178
179
180
# File 'lib/connection_pool.rb', line 178

def idle
  @available.idle
end

#reap(idle_seconds: 60) ⇒ Object

Reaps idle connections that have been idle for over idle_seconds. idle_seconds defaults to 60.



168
169
170
# File 'lib/connection_pool.rb', line 168

def reap(idle_seconds: 60, &)
  @available.reap(idle_seconds:, &)
end

#reloadObject

Reloads the ConnectionPool by passing each connection to block and then removing it the pool. Subsequent checkouts will create new connections as needed.



162
163
164
# File 'lib/connection_pool.rb', line 162

def reload(&)
  @available.shutdown(reload: true, &)
end

#shutdownObject

Shuts down the ConnectionPool by passing each connection to block and then removing it from the pool. Attempting to checkout a connection after shutdown will raise ConnectionPool::PoolShuttingDownError.



154
155
156
# File 'lib/connection_pool.rb', line 154

def shutdown(&)
  @available.shutdown(&)
end

#withObject Also known as: then



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/connection_pool.rb', line 60

def with(**)
  # We need to manage exception handling manually here in order
  # to work correctly with `Timeout.timeout` and `Thread#raise`.
  # Otherwise an interrupted Thread can leak connections.
  Thread.handle_interrupt(Exception => :never) do
    conn = checkout(**)
    begin
      Thread.handle_interrupt(Exception => :immediate) do
        yield conn
      end
    ensure
      checkin
    end
  end
end