Class: PG::EM::ConnectionPool
- Inherits:
-
Object
- Object
- PG::EM::ConnectionPool
- Defined in:
- lib/pg/em/connection_pool.rb
Overview
Connection pool for PG::EM::Client
- Author
-
Rafal Michalski
The ConnectionPool allocates new connections asynchronously when there are no free connections left up to the #max_size number.
If PG::EM::Client#async_autoreconnect option is not set or the re-connect fails the failed connection is dropped from the pool.
The list of Client command methods that are available in ConnectionPool:
Fiber synchronized methods:
The asynchronous command methods:
The pool will only allow for #max_size commands (both deferred and fiber synchronized) to be performed concurrently. The pending requests will be queued and executed when connections become available.
Please keep in mind, that the above methods may send commands to different clients from the pool each time they are called. You can’t assume anything about which connection is acquired even if the #max_size of the pool is set to one. This is because no connection will be shared between two concurrent requests and the connections maight occasionally fail and they will be dropped from the pool.
This prevents the ‘*_defer` commands to execute transactions.
For transactions use #transaction and fiber synchronized methods.
Defined Under Namespace
Classes: DeferredOptions
Constant Summary collapse
- DEFAULT_SIZE =
4
Instance Attribute Summary collapse
-
#allocated ⇒ Object
readonly
Returns the value of attribute allocated.
-
#async_autoreconnect ⇒ Boolean
Set PG::EM::Client#async_autoreconnect on all present and future connections in this pool or read value from options.
-
#available ⇒ Object
readonly
Returns the value of attribute available.
-
#connect_timeout ⇒ Float
Set PG::EM::Client#connect_timeout on all present and future connections in this pool or read value from options.
-
#max_size ⇒ Integer
readonly
Maximum number of connections in the connection pool.
-
#on_autoreconnect ⇒ Proc<Client, Error>
Set PG::EM::Client#on_autoreconnect on all present and future connections in this pool or read value from options.
-
#query_timeout ⇒ Float
Set PG::EM::Client#query_timeout on all present and future connections in this pool or read value from options.
Class Method Summary collapse
-
.connect_defer(options = {}) {|pg| ... } ⇒ FeaturedDeferrable
(also: async_connect)
Creates and initializes new connection pool.
Instance Method Summary collapse
-
#finish ⇒ Object
(also: #close)
Finishes all available connections and clears the available pool.
-
#hold {|pg| ... } ⇒ Object
(also: #execute)
Acquires Client connection and passes it to the given block.
-
#initialize(options = {}) ⇒ ConnectionPool
constructor
Creates and initializes a new connection pool.
- #method_missing(*a, &b) ⇒ Object
- #respond_to_missing?(m, priv = false) ⇒ Boolean
-
#size ⇒ Integer
Current number of connections in the connection pool.
-
#transaction(&blk) ⇒ Object
Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs.
Constructor Details
#initialize(options = {}) ⇒ ConnectionPool
Creates and initializes a new connection pool.
The connection pool allocates its first connection upon initialization unless lazy: true option is given.
Pass PG::EM::Client options
together with ConnectionPool options
:
-
:size
=4
- the maximum number of concurrent connections -
:lazy
= false - should lazy allocate first connection -
:connection_class
= PG::EM::Client
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/pg/em/connection_pool.rb', line 83 def initialize( = {}) @available = [] @pending = [] @allocated = {} @connection_class = Client lazy = false @options = .reject do |key, value| case key.to_sym when :size, :max_size @max_size = value.to_i true when :connection_class @connection_class = value true when :lazy lazy = value true end end @max_size ||= DEFAULT_SIZE raise ArgumentError, "#{self.class}.new: pool size must be > 1" if @max_size < 1 # allocate first connection, unless we are lazy hold unless lazy end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(*a, &b) ⇒ Object
309 310 311 |
# File 'lib/pg/em/connection_pool.rb', line 309 def method_missing(*a, &b) hold { |c| c.__send__(*a, &b) } end |
Instance Attribute Details
#allocated ⇒ Object (readonly)
Returns the value of attribute allocated.
68 69 70 |
# File 'lib/pg/em/connection_pool.rb', line 68 def allocated @allocated end |
#async_autoreconnect ⇒ Boolean
Set PG::EM::Client#async_autoreconnect on all present and future connections in this pool or read value from options
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/pg/em/connection_pool.rb', line 186 %w[connect_timeout query_timeout async_autoreconnect on_autoreconnect].each do |name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) @options[:#{name}] = value b = proc { |c| c.#{name} = value } @available.each(&b) @allocated.each_value(&b) end def #{name} @options[:#{name}] || @options['#{name}'] end EOD DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) self[:#{name}=] = value end EOD end |
#available ⇒ Object (readonly)
Returns the value of attribute available.
68 69 70 |
# File 'lib/pg/em/connection_pool.rb', line 68 def available @available end |
#connect_timeout ⇒ Float
Set PG::EM::Client#connect_timeout on all present and future connections in this pool or read value from options
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/pg/em/connection_pool.rb', line 186 %w[connect_timeout query_timeout async_autoreconnect on_autoreconnect].each do |name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) @options[:#{name}] = value b = proc { |c| c.#{name} = value } @available.each(&b) @allocated.each_value(&b) end def #{name} @options[:#{name}] || @options['#{name}'] end EOD DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) self[:#{name}=] = value end EOD end |
#max_size ⇒ Integer (readonly)
Maximum number of connections in the connection pool
66 67 68 |
# File 'lib/pg/em/connection_pool.rb', line 66 def max_size @max_size end |
#on_autoreconnect ⇒ Proc<Client, Error>
Set PG::EM::Client#on_autoreconnect on all present and future connections in this pool or read value from options
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/pg/em/connection_pool.rb', line 186 %w[connect_timeout query_timeout async_autoreconnect on_autoreconnect].each do |name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) @options[:#{name}] = value b = proc { |c| c.#{name} = value } @available.each(&b) @allocated.each_value(&b) end def #{name} @options[:#{name}] || @options['#{name}'] end EOD DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) self[:#{name}=] = value end EOD end |
#query_timeout ⇒ Float
Set PG::EM::Client#query_timeout on all present and future connections in this pool or read value from options
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/pg/em/connection_pool.rb', line 186 %w[connect_timeout query_timeout async_autoreconnect on_autoreconnect].each do |name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) @options[:#{name}] = value b = proc { |c| c.#{name} = value } @available.each(&b) @allocated.each_value(&b) end def #{name} @options[:#{name}] || @options['#{name}'] end EOD DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__ def #{name}=(value) self[:#{name}=] = value end EOD end |
Class Method Details
.connect_defer(options = {}) {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_connect
Creates and initializes new connection pool.
Attempts to establish the first connection asynchronously.
Use the returned deferrable’s callback
hook to obtain newly created PG::EM::ConnectionPool. In case of a connection error errback
hook is called with a raised error object as its argument.
If the block is provided it’s bound to both callback
and errback
hooks of the returned deferrable.
Pass PG::EM::Client options
together with ConnectionPool options
:
-
:size
=4
- the maximum number of concurrent connections -
:connection_class
= PG::EM::Client
134 135 136 137 138 139 |
# File 'lib/pg/em/connection_pool.rb', line 134 def self.connect_defer( = {}, &blk) pool = new .merge(lazy: true) pool.__send__(:hold_deferred, blk) do ::EM::DefaultDeferrable.new.tap { |df| df.succeed pool } end end |
Instance Method Details
#finish ⇒ Object Also known as: close
Finishes all available connections and clears the available pool.
After call to this method the pool is still usable and will try to allocate new client connections on subsequent query commands.
157 158 159 160 161 |
# File 'lib/pg/em/connection_pool.rb', line 157 def finish @available.each { |c| c.finish } @available.clear self end |
#hold {|pg| ... } ⇒ Object Also known as: execute
Acquires PG::EM::Client connection and passes it to the given block.
The connection is allocated to the current fiber and ensures that any subsequent query from the same fiber will be performed on the connection.
It is possible to nest hold calls from the same fiber, so each time the block will be given the same PG::EM::Client instance. This feature is needed e.g. for nesting transaction calls.
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/pg/em/connection_pool.rb', line 282 def hold fiber = Fiber.current id = fiber.object_id if conn = @allocated[id] skip_release = true else conn = acquire(fiber) until conn end begin yield conn if block_given? rescue PG::Error if conn.status != PG::CONNECTION_OK conn.finish unless conn.finished? drop_failed(id) skip_release = true end raise ensure release(id) unless skip_release end end |
#respond_to_missing?(m, priv = false) ⇒ Boolean
313 314 315 |
# File 'lib/pg/em/connection_pool.rb', line 313 def respond_to_missing?(m, priv = false) hold { |c| c.respond_to?(m, priv) } end |
#size ⇒ Integer
Current number of connections in the connection pool
149 150 151 |
# File 'lib/pg/em/connection_pool.rb', line 149 def size @available.length + @allocated.length end |
#transaction(&blk) ⇒ Object
Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs. Calls to transaction may be nested, however without sub-transactions (save points).
266 267 268 269 270 |
# File 'lib/pg/em/connection_pool.rb', line 266 def transaction(&blk) hold do |pg| pg.transaction(&blk) end end |