Author: RafaƂ Michalski (rafal at yeondir dot com)


em-pg-client is the Ruby EventMachine driver interface to the PostgreSQL RDBMS. It is based on ruby-pg.

em-pg-client provides PG::EM::Client class which inherits PG::Connection. You can work with PG::EM::Client almost the same way you would work with PG::Connection.

The real difference begins when you turn the EventMachine reactor on.

  require 'pg/em'

  pg = PG::EM::Client.new dbname: 'test'

  # no async
  pg.query('select * from foo') do |result|
    puts Array(result).inspect

  # asynchronous
  EM.run do
    Fiber.new do
      pg.query('select * from foo') do |result|
        puts Array(result).inspect

  # asynchronous + deferrable
  EM.run do
    df = pg.query_defer('select * from foo')
    df.callback { |result|
      puts Array(result).inspect
    df.errback {|ex|
      raise ex
    puts "sent"


  • Non-blocking / fully asynchronous processing with EventMachine.
  • Event reactor auto-detecting, asynchronous fiber-synchronized command methods (the same code can be used regardless of the EventMachine reactor state)
  • Asynchronous EM-style (deferrable returning) command methods.
  • Fully asynchronous automatic re-connects on connection failures (e.g.: RDBMS restarts, network failures).
  • Minimal changes to PG::Connection API.
  • Configurable timeouts (connect or execute) of asynchronous processing.
  • Dedicated connection pool with dynamic size, supporting asynchronous processing and transactions.
  • Sequel Adapter by Peter Yanovich.
  • Works on windows (requires ruby 2.0) (issue #7).



  $ [sudo] gem install em-pg-client


  gem "em-pg-client", "~> 0.3.0"


  git clone git://github.com/royaltm/ruby-em-pg-client.git


PG::Connection commands adapted to the EventMachine

Asynchronous, the EventMachine style:

  • Client.connect_defer (singleton method)
  • reset_defer
  • exec_defer (alias: query_defer)
  • prepare_defer
  • exec_prepared_defer
  • describe_prepared_defer
  • describe_portal_defer

For arguments of these methods consult their original (without the _defer suffix) counterparts in the PG::Connection manual.

Use callback with a block on the returned deferrable object to receive the result. In case of connect_defer and reset_defer the result is an instance of the PG::EM::Client. The received client is in connected state and ready for the queries. Otherwise an instance of the PG::Result is received. You may clear the obtained result object or leave it to gc.

To detect an error in the executed command call errback on the deferrable with a block. You should expect an instance of the raised Exception (usually PG::Error) as the block argument.

Reactor sensing methods, EM-Synchrony style:

  • Client.new (singleton, alias: connect, open, setdb, setdblogin)
  • reset
  • exec (alias: query, async_exec, async_query)
  • prepare
  • exec_prepared
  • describe_prepared
  • describe_portal

The above methods call *_defer counterparts of themselves and yield from the current fiber awaiting for the result. The PG::Result instance (or PG::EM::Client for new) is then returned to the caller. If a code block is given, it will be passed the result as an argument. In that case the value of the block is returned instead and the result is being cleared (or in case of new - client is being closed) after block terminates.

These methods check if EventMachine's reactor is running and the current fiber is not a root fiber. Otherwise the parent (thread-blocking) PG::Connection methods are being called.

You can call asynchronous, fiber aware and blocking methods without finishing the connection. You only need to start/stop EventMachine in between the asynchronous calls.

Although the em-synchrony provides very nice set of tools for the untangled EventMachine, you don't really require it to fully benefit from the PG::EM::Client. Just wrap your asynchronous code in a fiber:

Fiber.new { ... }.resume

Special options

There are four special connection options and one of them is a standard pg option used by the async methods. You may pass them as one of the hash options to PG::EM::Client.new or PG::EM::Client.connect_defer or simply use the accessor methods to change them on the fly.

The options are:

  • connect_timeout
  • query_timeout
  • async_autoreconnect
  • on_autoreconnect

Only connect_timeout is a standard libpq option, although changing it with the accessor method affects asynchronous functions only. See PG::EM::Client for more details.

Handling errors

Exactly like in pg:

  EM.synchrony do
      pg.query('smellect 1')
    rescue => e
      puts "error: #{e.inspect}"

with *_defer methods:

  EM.run do
    pg.query_defer('smellect 1') do |ret|
      if ret.is_a?(Exception)
        puts "PSQL error: #{ret.inspect}"


  EM.run do
    pg.query_defer('smellect 1').callback do |ret|
      puts "do something with #{ret}"
    end.errback do |err|
      puts "PSQL error: #{err.inspect}"

Auto re-connecting in asynchronous mode

Connection reset is done in a non-blocking manner using reset_defer internally.

  EM.run do
    Fiber.new do
      pg = PG::EM::Client.new async_autoreconnect: true

      try_query = lambda do
        pg.query('select * from foo') do |result|
          puts Array(result).inspect

      system 'pg_ctl stop -m fast'
      system 'pg_ctl start -w'


to enable this feature call:

  pg.async_autoreconnect = true

Additionally the on_autoreconnect callback may be set on the connection. It's being invoked after successfull connection restart, just before the pending command is sent again to the server.

Connection Pool

Forever alone? Not anymore! There is a dedicated PG::EM::ConnectionPool class with dynamic pool for both types of asynchronous commands (deferral and fiber-synchronized).

It also provides a #transaction method which locks the in-transaction connection to the calling fiber and allows to execute commands on the same connection within a transaction block. The transactions may be nested. See also docs for the PG::EM::Client#transaction method.

Parallel async queries

  require 'pg/em/connection_pool'
  require 'em-synchrony'

  EM.synchrony do
    pg = PG::EM::ConnectionPool.new(size: 2, dbname: 'test')

    multi = EM::Synchrony::Multi.new
    multi.add :foo, pg.query_defer('select pg_sleep(1)')
    multi.add :bar, pg.query_defer('select pg_sleep(1)')

    start = Time.now
    res = multi.perform
    # around 1 sec.
    puts Time.now - start


Fiber Concurrency

  require 'pg/em/connection_pool'
  require 'em-synchrony'
  require "em-synchrony/fiber_iterator"

  EM.synchrony do
    concurrency = 5
    queries = (1..10).map {|i| "select pg_sleep(1); select #{i}" }

    pg = PG::EM::ConnectionPool.new(size: concurrency, dbname: 'test')

    start = Time.now
    EM::Synchrony::FiberIterator.new(queries, concurrency).each do |query|
      pg.query(query) do |result|
        puts "recv: #{result.getvalue(0,0)}"
    # around 2 secs.
    puts Time.now - start


API Changes

0.2.x -> 0.3.x

There is a substantial difference in the API between this and the previous releases. The idea behind it was to make this implementation as much compatible as possible with the threaded pg interface. E.g. the #async_exec is now an alias to #exec.

The other reason was to get rid of the ugly em / em-synchrony duality.

  • There is no separate em-synchrony client version anymore.
  • The methods returning Deferrable have now the *_defer suffix.
  • The #async_exec and #async_query (in <= 0.2 they were deferrable methods) are now aliases to #exec.
  • The command methods #exec, #query, #exec_*, #describe_* are now em-synchrony style methods (fiber-synchronized).
  • The following methods were removed:

    • #async_prepare,
    • #async_exec_prepared,
    • #async_describe_prepared,
    • #async_describe_portal

    as their names were confusing due to the unfortunate #async_exec.

  • The async_connect and #async_reset are renamed to connect_defer and #reset_defer respectively.

0.1.x -> 0.2.x

  • on_reconnect renamed to more accurate on_autoreconnect (well, it's not used by PG::EM::Client#reset call).
  • async_autoreconnect is false by default if on_autoreconnect is not specified as initialization option.


  • no async support for: COPY commands (get_copy_data, put_copy_data), wait_for_notify
  • actually no ActiveRecord support (you are welcome to contribute).


  • implement streaming results (Postgres >= 9.2)
  • implement EM adapted version of get_copy_data, put_copy_data, wait_for_notify and transaction
  • ORM (ActiveRecord and maybe Datamapper) support as separate projects
  • present more benchmarks

More Info

This implementation makes use of non-blocking: PGConn#is_busy and PGConn#consume_input methods. Depending on the size of queried results and the concurrency level, the gain in overall speed and responsiveness of your application might be actually quite huge. See BENCHMARKING.


The greetz go to: