Class: EzPool

Inherits:
Object
  • Object
show all
Defined in:
lib/ezpool.rb,
lib/ezpool/version.rb,
lib/ezpool/monotonic_time.rb

Overview

Generic connection pool class for e.g. sharing a limited number of network connections among many threads. Note: Connections are lazily created.

Example usage with block (faster):

@pool = EzPool.new { Redis.new }

@pool.with do |redis|
  redis.lpop('my-list') if redis.llen('my-list') > 0
end

Using optional timeout override (for that single invocation)

@pool.with(timeout: 2.0) do |redis|
  redis.lpop('my-list') if redis.llen('my-list') > 0
end

Example usage replacing an existing connection (slower):

$redis = EzPool.wrap { Redis.new }

def do_work
  $redis.lpop('my-list') if $redis.llen('my-list') > 0
end

Note that there’s no way to pass a disconnection function to this usage, nor any way to guarantee that subsequent calls will go to the same connection (if your connection has any concept of sessions, this may be important). We strongly recommend against using wrapped connections in production environments.

Accepts the following options:

  • :size - number of connections to pool, defaults to 5

  • :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds

  • :max_age - maximum number of seconds that a connection may be alive for (will recycle on checkin/checkout)

  • :connect_with - callable for creating a connection

  • :disconnect-_with - callable for shutting down a connection

Defined Under Namespace

Classes: CheckedInUnCheckedOutConnectionError, ConnectCallableNeverConfigured, ConnectionManager, ConnectionWrapper, Error, PoolShuttingDownError, TimedStack, Wrapper

Constant Summary collapse

DEFAULTS =
{size: 5, timeout: 1, max_age: Float::INFINITY}
VERSION =
"1.0.0"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}, &block) ⇒ EzPool

Returns a new instance of EzPool.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/ezpool.rb', line 55

def initialize(options = {}, &block)
  options = DEFAULTS.merge(options)

  @size = options.fetch(:size)
  @timeout = options.fetch(:timeout)
  @max_age = options.fetch(:max_age).to_f

  if @max_age <= 0
    raise ArgumentError.new(":max_age must be > 0")
  end

  if block_given?
    if options.include?(:connect_with)
      raise ArgumentError.new("Block passed to EzPool *and* :connect_with in options")
    else
      options[:connect_with] = block
    end
  end

  @manager = EzPool::ConnectionManager.new(options[:connect_with], options[:disconnect_with])

  @available = TimedStack.new(@manager, @size)
  @key = :"current-#{@available.object_id}"

  @checked_out_connections = Hash.new
  @mutex = Mutex.new
end

Class Method Details

.monotonic_timeFloat

Returns the current time a tracked by the application monotonic clock.

Returns:

  • (Float)

    The current monotonic time when ‘since` not given else the elapsed monotonic time between `since` and the current time



62
63
64
# File 'lib/ezpool/monotonic_time.rb', line 62

def monotonic_time
  GLOBAL_MONOTONIC_CLOCK.get_time
end

.wrap(options, &block) ⇒ Object



48
49
50
51
52
53
# File 'lib/ezpool.rb', line 48

def self.wrap(options, &block)
  if block_given?
    options[:connect_with] = block
  end
  Wrapper.new(options)
end

Instance Method Details

#checkin(conn) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/ezpool.rb', line 134

def checkin(conn)
  conn_wrapper = @mutex.synchronize do
    @checked_out_connections.delete(conn.object_id)
  end
  if conn_wrapper.nil?
    raise EzPool::CheckedInUnCheckedOutConnectionError
  end
  if expired? conn_wrapper
    @available.abandon(conn_wrapper)
  else
    @available.push(conn_wrapper)
  end
  nil
end

#checkout(options = {}) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/ezpool.rb', line 117

def checkout(options = {})
  conn_wrapper = nil
  while conn_wrapper.nil? do
    timeout = options[:timeout] || @timeout
    conn_wrapper = @available.pop(timeout: timeout)
    if expired? conn_wrapper
      @available.abandon(conn_wrapper)
      conn_wrapper = nil
    end
  end

  @mutex.synchronize do
    @checked_out_connections[conn_wrapper.raw_conn.object_id] = conn_wrapper
  end
  conn_wrapper.raw_conn
end

#connect_with(&block) ⇒ Object



83
84
85
# File 'lib/ezpool.rb', line 83

def connect_with(&block)
  @manager.connect_with(&block)
end

#disconnect_with(&block) ⇒ Object



87
88
89
# File 'lib/ezpool.rb', line 87

def disconnect_with(&block)
  @manager.disconnect_with(&block)
end

#shutdownObject



149
150
151
152
153
154
# File 'lib/ezpool.rb', line 149

def shutdown
  if block_given?
    raise ArgumentError.new("shutdown no longer accepts a block; call #disconnect_with to set the disconnect method, or pass the disconnect: option to the EzPool initializer")
  end
  @available.shutdown
end

#with(options = {}) ⇒ Object

non-MRI



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/ezpool.rb', line 93

def with(options = {})
  Thread.handle_interrupt(Exception => :never) do
    conn = checkout(options)
    begin
      Thread.handle_interrupt(Exception => :immediate) do
        yield conn
      end
    ensure
      checkin conn
    end
  end
end