Class: PG::EM::ConnectionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/pg/em/connection_pool.rb

Overview

Connection pool for PG::EM::Client

Author

Rafal Michalski

The ConnectionPool allocates new connections asynchronously when there are no free connections left up to the #max_size number.

If PG::EM::Client#async_autoreconnect option is not set or the re-connect fails the failed connection is dropped from the pool.

The list of Client command methods that are available in ConnectionPool:

Fiber synchronized methods:

The asynchronous command methods:

The pool will only allow for #max_size commands (both deferred and fiber synchronized) to be performed concurrently. The pending requests will be queued and executed when connections become available.

Please keep in mind, that the above methods may send commands to different clients from the pool each time they are called. You can’t assume anything about which connection is acquired even if the #max_size of the pool is set to one. This is because no connection will be shared between two concurrent requests and the connections maight occasionally fail and they will be dropped from the pool.

This prevents the ‘*_defer` commands to execute transactions.

For transactions use #transaction and fiber synchronized methods.

Examples:

Basic usage

pg = PG::EM::ConnectionPool.new size: 10, dbname: 'foo'
res = pg.query 'select * from bar'

Defined Under Namespace

Classes: DeferredOptions

Constant Summary collapse

DEFAULT_SIZE =
4

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ConnectionPool

Creates and initializes a new connection pool.

The connection pool allocates its first connection upon initialization unless lazy: true option is given.

Pass PG::EM::Client options together with ConnectionPool options:

  • :size = 4 - the maximum number of concurrent connections

  • :lazy = false - should lazy allocate first connection

  • :connection_class = PG::EM::Client

Raises:

  • (PG::Error)
  • (ArgumentError)


83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/pg/em/connection_pool.rb', line 83

def initialize(options = {})
  @available = []
  @pending = []
  @allocated = {}
  @connection_class = Client

  lazy = false
  @options = options.reject do |key, value|
    case key.to_sym
    when :size, :max_size
      @max_size = value.to_i
      true
    when :connection_class
      @connection_class = value
      true
    when :lazy
      lazy = value
      true
    end
  end

  @max_size ||= DEFAULT_SIZE

  raise ArgumentError, "#{self.class}.new: pool size must be > 1" if @max_size < 1

  # allocate first connection, unless we are lazy
  hold unless lazy
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*a, &b) ⇒ Object



309
310
311
# File 'lib/pg/em/connection_pool.rb', line 309

def method_missing(*a, &b)
  hold { |c| c.__send__(*a, &b) }
end

Instance Attribute Details

#allocatedObject (readonly)

Returns the value of attribute allocated.



68
69
70
# File 'lib/pg/em/connection_pool.rb', line 68

def allocated
  @allocated
end

#async_autoreconnectBoolean

Set PG::EM::Client#async_autoreconnect on all present and future connections in this pool or read value from options

Returns:

  • (Boolean)

    asynchronous auto re-connect status



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/pg/em/connection_pool.rb', line 186

%w[connect_timeout
   query_timeout
   async_autoreconnect
   on_autoreconnect].each do |name|
  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      @options[:#{name}] = value
      b = proc { |c| c.#{name} = value }
      @available.each(&b)
      @allocated.each_value(&b)
    end

    def #{name}
      @options[:#{name}] || @options['#{name}']
    end
  EOD
  DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      self[:#{name}=] = value
    end
  EOD
end

#availableObject (readonly)

Returns the value of attribute available.



68
69
70
# File 'lib/pg/em/connection_pool.rb', line 68

def available
  @available
end

#connect_timeoutFloat

Set PG::EM::Client#connect_timeout on all present and future connections in this pool or read value from options

Returns:

  • (Float)

    connection timeout in seconds



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/pg/em/connection_pool.rb', line 186

%w[connect_timeout
   query_timeout
   async_autoreconnect
   on_autoreconnect].each do |name|
  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      @options[:#{name}] = value
      b = proc { |c| c.#{name} = value }
      @available.each(&b)
      @allocated.each_value(&b)
    end

    def #{name}
      @options[:#{name}] || @options['#{name}']
    end
  EOD
  DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      self[:#{name}=] = value
    end
  EOD
end

#max_sizeInteger (readonly)

Maximum number of connections in the connection pool

Returns:

  • (Integer)


66
67
68
# File 'lib/pg/em/connection_pool.rb', line 66

def max_size
  @max_size
end

#on_autoreconnectProc<Client, Error>

Set PG::EM::Client#on_autoreconnect on all present and future connections in this pool or read value from options

Returns:

  • (Proc<Client, Error>)

    auto re-connect hook



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/pg/em/connection_pool.rb', line 186

%w[connect_timeout
   query_timeout
   async_autoreconnect
   on_autoreconnect].each do |name|
  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      @options[:#{name}] = value
      b = proc { |c| c.#{name} = value }
      @available.each(&b)
      @allocated.each_value(&b)
    end

    def #{name}
      @options[:#{name}] || @options['#{name}']
    end
  EOD
  DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      self[:#{name}=] = value
    end
  EOD
end

#query_timeoutFloat

Set PG::EM::Client#query_timeout on all present and future connections in this pool or read value from options

Returns:

  • (Float)

    query timeout in seconds



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/pg/em/connection_pool.rb', line 186

%w[connect_timeout
   query_timeout
   async_autoreconnect
   on_autoreconnect].each do |name|
  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      @options[:#{name}] = value
      b = proc { |c| c.#{name} = value }
      @available.each(&b)
      @allocated.each_value(&b)
    end

    def #{name}
      @options[:#{name}] || @options['#{name}']
    end
  EOD
  DeferredOptions.class_eval <<-EOD, __FILE__, __LINE__
    def #{name}=(value)
      self[:#{name}=] = value
    end
  EOD
end

Class Method Details

.connect_defer(options = {}) {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_connect

Creates and initializes new connection pool.

Attempts to establish the first connection asynchronously.

Use the returned deferrable’s callback hook to obtain newly created PG::EM::ConnectionPool. In case of a connection error errback hook is called with a raised error object as its argument.

If the block is provided it’s bound to both callback and errback hooks of the returned deferrable.

Pass PG::EM::Client options together with ConnectionPool options:

  • :size = 4 - the maximum number of concurrent connections

  • :connection_class = PG::EM::Client

Yield Parameters:

  • pg (Client|PG::Error)

    new and connected client instance on success or a raised PG::Error

Returns:

Raises:

  • (ArgumentError)


134
135
136
137
138
139
# File 'lib/pg/em/connection_pool.rb', line 134

def self.connect_defer(options = {}, &blk)
  pool = new options.merge(lazy: true)
  pool.__send__(:hold_deferred, blk) do
    ::EM::DefaultDeferrable.new.tap { |df| df.succeed pool }
  end
end

Instance Method Details

#finishObject Also known as: close

Finishes all available connections and clears the available pool.

After call to this method the pool is still usable and will try to allocate new client connections on subsequent query commands.



157
158
159
160
161
# File 'lib/pg/em/connection_pool.rb', line 157

def finish
  @available.each { |c| c.finish }
  @available.clear
  self
end

#hold {|pg| ... } ⇒ Object Also known as: execute

Acquires PG::EM::Client connection and passes it to the given block.

The connection is allocated to the current fiber and ensures that any subsequent query from the same fiber will be performed on the connection.

It is possible to nest hold calls from the same fiber, so each time the block will be given the same PG::EM::Client instance. This feature is needed e.g. for nesting transaction calls.

Yield Parameters:



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/pg/em/connection_pool.rb', line 282

def hold
  fiber = Fiber.current
  id = fiber.object_id

  if conn = @allocated[id]
    skip_release = true
  else
    conn = acquire(fiber) until conn
  end

  begin
    yield conn if block_given?

  rescue PG::Error
    if conn.status != PG::CONNECTION_OK
      conn.finish unless conn.finished?
      drop_failed(id)
      skip_release = true
    end
    raise
  ensure
    release(id) unless skip_release
  end
end

#respond_to_missing?(m, priv = false) ⇒ Boolean

Returns:

  • (Boolean)


313
314
315
# File 'lib/pg/em/connection_pool.rb', line 313

def respond_to_missing?(m, priv = false)
  hold { |c| c.respond_to?(m, priv) }
end

#sizeInteger

Current number of connections in the connection pool

Returns:

  • (Integer)


149
150
151
# File 'lib/pg/em/connection_pool.rb', line 149

def size
  @available.length + @allocated.length
end

#transaction(&blk) ⇒ Object

Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs. Calls to transaction may be nested, however without sub-transactions (save points).

Examples:

Transactions

pg = PG::EM::ConnectionPool.new size: 10
pg.transaction do
  pg.exec('insert into animals (family, species) values ($1,$2)',
          [family, species])
  num = pg.query('select count(*) from people where family=$1',
          [family]).get_value(0,0)
  pg.exec('update stats set count = $1 where family=$2',
          [num, family])
end

See Also:



266
267
268
269
270
# File 'lib/pg/em/connection_pool.rb', line 266

def transaction(&blk)
  hold do |pg|
    pg.transaction(&blk)
  end
end