Class: PG::EM::Client

Inherits:
Connection
  • Object
show all
Defined in:
lib/pg/em.rb,
lib/pg/em/client/watcher.rb,
lib/pg/em/client/connect_watcher.rb

Overview

PostgreSQL EventMachine client

Author

Rafal Michalski

Client is a PG::Connection wrapper designed for EventMachine.

The following new methods:

are added to execute queries asynchronously, returning Deferrable object.

The following methods of PG::Connection are overloaded:

and are now auto-detecting if EventMachine is running and performing commands asynchronously (blocking only current fiber) or calling parent thread-blocking methods.

If #async_autoreconnect option is set to true, all of the above methods (in asynchronous mode) try to re-connect after a connection error occurs. It’s performed behind the scenes, so no error is raised, except if there was a transaction in progress. In such instance the error is raised after establishing connection to signal that the transaction was aborted.

If you want to detect auto re-connect event use #on_autoreconnect property/option.

To enable auto-reconnecting set:

client.async_autoreconnect = true

or pass as Client.new hash argument:

PG::EM::Client.new dbname: 'bar', async_autoreconnect: true

There are also new methods:

which are asynchronous versions of PG::Connection.new and PG:Connection#reset.

Additionally the following methods are overloaded:

providing auto-detecting asynchronous (fiber-synchronized) or thread-blocking methods for (re)connecting.

Otherwise nothing changes in PG::Connection API. See PG::Connection docs for explanation of arguments to the above methods.

Warning:

#describe_prepared and #exec_prepared after #prepare should only be invoked on the same connection. If you are using a connection pool, make sure to acquire a single connection first.

Defined Under Namespace

Modules: ConnectWatcher, Watcher

Constant Summary collapse

ROOT_FIBER =
Fiber.current
DEFAULT_ASYNC_VARS =
{
  :@async_autoreconnect => nil,
  :@connect_timeout => nil,
  :@query_timeout => 0,
  :@on_autoreconnect => nil,
  :@async_command_aborted => false,
}.freeze
TRAN_BEGIN_QUERY =
'BEGIN'
TRAN_ROLLBACK_QUERY =
'ROLLBACK'
TRAN_COMMIT_QUERY =
'COMMIT'
@@connect_timeout_envvar =

environment variable name for connect_timeout fallback value

conndefaults.find{|d| d[:keyword] == "connect_timeout" }[:envvar]

Instance Attribute Summary collapse

Deferrable connection methods collapse

Auto-sensing fiber-synchronized connection methods collapse

Deferrable command methods collapse

Auto-sensing thread or fiber blocking command methods collapse

Instance Method Summary collapse

Instance Attribute Details

#async_autoreconnectBoolean

Enable/disable auto re-connect feature (true/false). Defaults to false unless #on_autoreconnect is specified as an initialization option.

Changing #on_autoreconnect with accessor method doesn’t change the state of #async_autoreconnect.

You can also specify this as an option to new or connect_defer.

Returns:

  • (Boolean)

    asynchronous auto re-connect status



135
136
137
# File 'lib/pg/em.rb', line 135

def async_autoreconnect
  @async_autoreconnect
end

#connect_timeoutFloat

Connection timeout. Affects #reset and #reset_defer.

Changing this property does not affect thread-blocking #reset.

However if passed as initialization option, it also affects blocking #reset.

To enable it set to some positive value. To disable it: set to 0. You can also specify this as an option to new or connect_defer.

Returns:

  • (Float)

    connection timeout in seconds



113
114
115
# File 'lib/pg/em.rb', line 113

def connect_timeout
  @connect_timeout
end

#on_autoreconnectProc<Client, Error>

Proc that is called after a connection with the server has been automatically re-established. It’s being invoked just before the pending command is sent to the server.

The first argument it receives is the connection instance. The second is the original exception that caused the reconnecting process.

The proc can control the later action with its return value:

  • false (explicitly, nil is ignored) - the original exception is raised/passed back and the pending query command is not sent again to the server.

  • true (explicitly, truish values are ignored), the pending command is called regardless of the connection’s last transaction status.

  • Exception object - is raised/passed back and the pending command is not sent.

  • Deferrable object - the chosen action will depend on the deferred status.

  • Other values are ignored and the pending query command is immediately sent to the server unless there was a pending transaction before the connection was reset.

It’s possible to execute queries from inside of the proc.

You may pass this proc as an option to new or connect_defer.

Examples:

How to use prepare in on_autoreconnect hook

pg.on_autoreconnect = proc do |conn, ex|
  conn.prepare("species_by_name", 
   "select id, name from animals where species=$1 order by name")
end

Returns:

  • (Proc<Client, Error>)

    auto re-connect hook



172
173
174
# File 'lib/pg/em.rb', line 172

def on_autoreconnect
  @on_autoreconnect
end

#query_timeoutFloat

Aborts async command processing if server response time exceedes query_timeout seconds. This does not apply to #reset and #reset_defer.

To enable it set to some positive value. To disable it: set to 0. You can also specify this as an option to new or connect_defer.

Returns:

  • (Float)

    query timeout in seconds



123
124
125
# File 'lib/pg/em.rb', line 123

def query_timeout
  @query_timeout
end

Class Method Details

.connect_defer(*args) {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_connect

Attempts to establish the connection asynchronously.

Pass the block to the returned deferrable’s callback to obtain newly created and already connected PG::EM::Client object. In case of connection error errback hook receives an error object as an argument. If the block is provided it’s bound to both callback and errback hooks of the returned deferrable.

Special PG::EM::Client options (e.g.: #async_autoreconnect) must be provided as connection_hash argument variant. They will be ignored if passed as a connection_string.

client_encoding will be set according to Encoding.default_internal.

Yield Parameters:

  • pg (Client|PG::Error)

    new and connected client instance on success or an instance of raised PG::Error

Returns:

See Also:



243
244
245
246
247
248
249
250
251
252
253
# File 'lib/pg/em.rb', line 243

def self.connect_defer(*args, &blk)
  df = PG::EM::FeaturedDeferrable.new(&blk)
  async_args = parse_async_options(args)
  conn = df.protect { connect_start(*args) }
  if conn
    async_args.each {|k, v| conn.instance_variable_set(k, v) }
    ::EM.watch(conn.socket_io, ConnectWatcher, conn, df, false).
      poll_connection_and_check
  end
  df
end

.new(*args, &blk) ⇒ Object Also known as: connect, open, setdb, setdblogin

Creates new instance of PG::EM::Client and attempts to establish connection.

Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a thread-blocking call to the parent method.

Special PG::EM::Client options (e.g.: #async_autoreconnect) must be provided as connection_hash argument variant. They will be ignored if passed as a connection_string.

client_encoding will be set according to Encoding.default_internal.

Raises:

  • (PG::Error)

See Also:



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/pg/em.rb', line 339

def self.new(*args, &blk)
  if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
    connect_defer(*args) {|r| f.resume(r) }

    conn = Fiber.yield
    raise conn if conn.is_a?(::Exception)
    if block_given?
      begin
        yield conn
      ensure
        conn.finish
      end
    else
      conn
    end
  else
    super(*args)
  end
end

Instance Method Details

#describe_portal(portal_name) {|result| ... } ⇒ PG::Result, Object

Retrieve information about the portal portal_name,

Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
# File 'lib/pg/em.rb', line 631

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        #{defer_name}(*args) do |res|
          f.resume(res)
        end

        result = Fiber.yield
        raise result if result.is_a?(::Exception)
        if block_given?
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#describe_portal_defer(portal_name) {|result| ... } ⇒ FeaturedDeferrable

Asynchronously sends command to retrieve information about the portal portal_name, and immediately returns with deferrable.

Use the returned deferrable’s callback and errback method to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error)

    command result on success or a PG::Error instance on error.

Returns:

See Also:



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
# File 'lib/pg/em.rb', line 522

%w(
  exec_defer              send_query
  prepare_defer           send_prepare
  exec_prepared_defer     send_query_prepared
  describe_prepared_defer send_describe_prepared
  describe_portal_defer   send_describe_portal
).each_slice(2) do |defer_name, send_name|

  class_eval <<-EOD, __FILE__, __LINE__
  def #{defer_name}(*args, &blk)
    df = PG::EM::FeaturedDeferrable.new(&blk)
    send_proc = proc do
      #{send_name}(*args)
      if @watcher && @watcher.watching?
        @watcher.watch_query(df, send_proc)
      else
        @watcher = ::EM.watch(self.socket_io, Watcher, self).
                      watch_query(df, send_proc)
      end
    end
    begin
      if @async_command_aborted
        error = ConnectionBad.new("previous query expired, need connection reset")
        error.instance_variable_set(:@connection, self)
        raise error
      end
      @last_transaction_status = transaction_status
      send_proc.call
    rescue PG::Error => e
      ::EM.next_tick { async_autoreconnect!(df, e, &send_proc) }
    rescue Exception => e
      ::EM.next_tick { df.fail(e) }
    end
    df
  end
  EOD

end

#describe_prepared(statement_name) {|result| ... } ⇒ PG::Result, Object

Retrieve information about the prepared statement statement_name,

Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
# File 'lib/pg/em.rb', line 631

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        #{defer_name}(*args) do |res|
          f.resume(res)
        end

        result = Fiber.yield
        raise result if result.is_a?(::Exception)
        if block_given?
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#describe_prepared_defer(statement_name) {|result| ... } ⇒ FeaturedDeferrable

Asynchronously sends command to retrieve information about the prepared statement statement_name, and immediately returns with deferrable.

Use the returned deferrable’s callback and errback method to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error)

    command result on success or a PG::Error instance on error.

Returns:

See Also:



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
# File 'lib/pg/em.rb', line 522

%w(
  exec_defer              send_query
  prepare_defer           send_prepare
  exec_prepared_defer     send_query_prepared
  describe_prepared_defer send_describe_prepared
  describe_portal_defer   send_describe_portal
).each_slice(2) do |defer_name, send_name|

  class_eval <<-EOD, __FILE__, __LINE__
  def #{defer_name}(*args, &blk)
    df = PG::EM::FeaturedDeferrable.new(&blk)
    send_proc = proc do
      #{send_name}(*args)
      if @watcher && @watcher.watching?
        @watcher.watch_query(df, send_proc)
      else
        @watcher = ::EM.watch(self.socket_io, Watcher, self).
                      watch_query(df, send_proc)
      end
    end
    begin
      if @async_command_aborted
        error = ConnectionBad.new("previous query expired, need connection reset")
        error.instance_variable_set(:@connection, self)
        raise error
      end
      @last_transaction_status = transaction_status
      send_proc.call
    rescue PG::Error => e
      ::EM.next_tick { async_autoreconnect!(df, e, &send_proc) }
    rescue Exception => e
      ::EM.next_tick { df.fail(e) }
    end
    df
  end
  EOD

end

#exec(sql) {|result| ... } ⇒ PG::Result, Object Also known as: query, async_query, async_exec

Sends SQL query request specified by sql to PostgreSQL.

Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
# File 'lib/pg/em.rb', line 631

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        #{defer_name}(*args) do |res|
          f.resume(res)
        end

        result = Fiber.yield
        raise result if result.is_a?(::Exception)
        if block_given?
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#exec_defer(sql, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable Also known as: query_defer, async_query_defer, async_exec_defer, exec_params_defer

Sends SQL query request specified by sql to PostgreSQL for asynchronous processing, and immediately returns with deferrable.

Use the returned deferrable’s callback and errback method to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error)

    command result on success or a PG::Error instance on error.

Returns:

See Also:



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
# File 'lib/pg/em.rb', line 522

%w(
  exec_defer              send_query
  prepare_defer           send_prepare
  exec_prepared_defer     send_query_prepared
  describe_prepared_defer send_describe_prepared
  describe_portal_defer   send_describe_portal
).each_slice(2) do |defer_name, send_name|

  class_eval <<-EOD, __FILE__, __LINE__
  def #{defer_name}(*args, &blk)
    df = PG::EM::FeaturedDeferrable.new(&blk)
    send_proc = proc do
      #{send_name}(*args)
      if @watcher && @watcher.watching?
        @watcher.watch_query(df, send_proc)
      else
        @watcher = ::EM.watch(self.socket_io, Watcher, self).
                      watch_query(df, send_proc)
      end
    end
    begin
      if @async_command_aborted
        error = ConnectionBad.new("previous query expired, need connection reset")
        error.instance_variable_set(:@connection, self)
        raise error
      end
      @last_transaction_status = transaction_status
      send_proc.call
    rescue PG::Error => e
      ::EM.next_tick { async_autoreconnect!(df, e, &send_proc) }
    rescue Exception => e
      ::EM.next_tick { df.fail(e) }
    end
    df
  end
  EOD

end

#exec_params(sql, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object

Sends SQL query request specified by sql with optional params and result_format to PostgreSQL.

Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
# File 'lib/pg/em.rb', line 631

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        #{defer_name}(*args) do |res|
          f.resume(res)
        end

        result = Fiber.yield
        raise result if result.is_a?(::Exception)
        if block_given?
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#exec_prepared(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object

Execute prepared named statement specified by statement_name.

Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
# File 'lib/pg/em.rb', line 631

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        #{defer_name}(*args) do |res|
          f.resume(res)
        end

        result = Fiber.yield
        raise result if result.is_a?(::Exception)
        if block_given?
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#exec_prepared_defer(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable

Execute prepared named statement specified by statement_name asynchronously, and immediately returns with deferrable.

Use the returned deferrable’s callback and errback method to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error)

    command result on success or a PG::Error instance on error.

Returns:

See Also:



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
# File 'lib/pg/em.rb', line 522

%w(
  exec_defer              send_query
  prepare_defer           send_prepare
  exec_prepared_defer     send_query_prepared
  describe_prepared_defer send_describe_prepared
  describe_portal_defer   send_describe_portal
).each_slice(2) do |defer_name, send_name|

  class_eval <<-EOD, __FILE__, __LINE__
  def #{defer_name}(*args, &blk)
    df = PG::EM::FeaturedDeferrable.new(&blk)
    send_proc = proc do
      #{send_name}(*args)
      if @watcher && @watcher.watching?
        @watcher.watch_query(df, send_proc)
      else
        @watcher = ::EM.watch(self.socket_io, Watcher, self).
                      watch_query(df, send_proc)
      end
    end
    begin
      if @async_command_aborted
        error = ConnectionBad.new("previous query expired, need connection reset")
        error.instance_variable_set(:@connection, self)
        raise error
      end
      @last_transaction_status = transaction_status
      send_proc.call
    rescue PG::Error => e
      ::EM.next_tick { async_autoreconnect!(df, e, &send_proc) }
    rescue Exception => e
      ::EM.next_tick { df.fail(e) }
    end
    df
  end
  EOD

end

#finishObject Also known as: close

Closes the backend connection.

Detaches watch handler to prevent memory leak then calls parent PG::Connection#finish.



379
380
381
382
383
384
385
# File 'lib/pg/em.rb', line 379

def finish
  super
  if @watcher
    @watcher.detach if @watcher.watching?
    @watcher = nil
  end
end

#prepare(stmt_name, sql, param_types = nil) {|result| ... } ⇒ PG::Result, Object

Prepares statement sql with name stmt_name to be executed later.

Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a blocking call to parent method.

Yield Parameters:

  • result (PG::Result)

    command result on success

Returns:

  • (PG::Result)

    if block wasn’t given

  • (Object)

    result of the given block

Raises:

  • (PG::Error)

See Also:



631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
# File 'lib/pg/em.rb', line 631

%w(
  exec              exec_defer
  exec_params       exec_defer
  exec_prepared     exec_prepared_defer
  prepare           prepare_defer
  describe_prepared describe_prepared_defer
  describe_portal   describe_portal_defer
  ).each_slice(2) do |name, defer_name|

  class_eval <<-EOD, __FILE__, __LINE__
    def #{name}(*args, &blk)
      if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
        #{defer_name}(*args) do |res|
          f.resume(res)
        end

        result = Fiber.yield
        raise result if result.is_a?(::Exception)
        if block_given?
          begin
            yield result
          ensure
            result.clear
          end
        else
          result
        end
      else
        super
      end
    end
  EOD
end

#prepare_defer(stmt_name, sql, param_types = nil) {|result| ... } ⇒ FeaturedDeferrable

Prepares statement sql with name stmt_name to be executed later asynchronously, and immediately returns with deferrable.

Use the returned deferrable’s callback and errback method to get the result. If the block is provided it’s bound to both the callback and errback hooks of the returned deferrable.

Yield Parameters:

  • result (PG::Result|Error)

    command result on success or a PG::Error instance on error.

Returns:

See Also:



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
# File 'lib/pg/em.rb', line 522

%w(
  exec_defer              send_query
  prepare_defer           send_prepare
  exec_prepared_defer     send_query_prepared
  describe_prepared_defer send_describe_prepared
  describe_portal_defer   send_describe_portal
).each_slice(2) do |defer_name, send_name|

  class_eval <<-EOD, __FILE__, __LINE__
  def #{defer_name}(*args, &blk)
    df = PG::EM::FeaturedDeferrable.new(&blk)
    send_proc = proc do
      #{send_name}(*args)
      if @watcher && @watcher.watching?
        @watcher.watch_query(df, send_proc)
      else
        @watcher = ::EM.watch(self.socket_io, Watcher, self).
                      watch_query(df, send_proc)
      end
    end
    begin
      if @async_command_aborted
        error = ConnectionBad.new("previous query expired, need connection reset")
        error.instance_variable_set(:@connection, self)
        raise error
      end
      @last_transaction_status = transaction_status
      send_proc.call
    rescue PG::Error => e
      ::EM.next_tick { async_autoreconnect!(df, e, &send_proc) }
    rescue Exception => e
      ::EM.next_tick { df.fail(e) }
    end
    df
  end
  EOD

end

#resetObject

Attempts to reset the connection.

Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.

Otherwise performs a thread-blocking call to the parent method.

Raises:

  • (PG::Error)

See Also:



307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/pg/em.rb', line 307

def reset
  @async_command_aborted = false
  if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER)
    reset_defer {|r| f.resume(r) }

    conn = Fiber.yield
    raise conn if conn.is_a?(::Exception)
    conn
  else
    super
  end
end

#reset_defer {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_reset

Attempts to reset the connection asynchronously.

Pass the block to the returned deferrable’s callback to execute after successfull reset. If the block is provided it’s bound to callback and errback hooks of the returned deferrable.

Yield Parameters:

  • pg (Client|PG::Error)

    reconnected client instance on success or an instance of raised PG::Error

Returns:



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/pg/em.rb', line 270

def reset_defer(&blk)
  @async_command_aborted = false
  df = PG::EM::FeaturedDeferrable.new(&blk)
  # there can be only one watch handler over the socket
  # apparently eventmachine has hard time dealing with more than one
  # for blocking reset this is not needed
  if @watcher
    @watcher.detach if @watcher.watching?
    @watcher = nil
  end
  ret = df.protect(:fail) { reset_start }
  unless ret == :fail
    ::EM.watch(self.socket_io, ConnectWatcher, self, df, true).
      poll_connection_and_check
  end
  df
end

#statusNumber

Returns status of connection: PG::CONNECTION_OK or PG::CONNECTION_BAD.

Returns PG::CONNECTION_BAD for connections with async_command_aborted flag set by expired query timeout. Otherwise return whatever PG::Connection#status returns.

Returns:

  • (Number)

See Also:



395
396
397
398
399
400
401
# File 'lib/pg/em.rb', line 395

def status
  if @async_command_aborted
    PG::CONNECTION_BAD
  else
    super
  end
end

#transaction {|client| ... } ⇒ Object

Note:

Avoid using PG::EM::Client#*_defer calls inside the block or make sure all queries are completed before the provided block terminates.

Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs.

Calls to #transaction may be nested, however without sub-transactions (save points). If the innermost transaction block raises an error the transaction is rolled back to the state before the outermost transaction began.

This is an extension to the PG::Connection#transaction method as it does not support nesting in this way.

The method is sensitive to the transaction status and will safely rollback on any sql error even when it was catched by some rescue block. But consider that rescuing any sql error within an utility method is a bad idea.

This method works in both blocking/async modes (regardles of the reactor state) and is considered as a generic extension to the PG::Connection#transaction method.

Examples:

Nested transaction example

def add_comment(user_id, text)
  db.transaction do
    cmt_id = db.query(
      'insert into comments (text) where user_id=$1 values ($2) returning id',
      [user_id, text]).getvalue(0,0)
    db.query(
      'update users set last_comment_id=$2 where id=$1', [user_id, cmt_id])
    cmt_id
  end
end

def update_comment_count(page_id)
  db.transaction do
    count = db.query('select count(*) from comments where page_id=$1', [page_id]).getvalue(0,0)
    db.query('update pages set comment_count=$2 where id=$1', [page_id, count])
  end
end

# to run add_comment and update_comment_count within the same transaction
db.transaction do
  add_comment(user_id, some_text)
  update_comment_count(page_id)
end

Yield Parameters:

  • client (self)

Returns:

  • (Object)

    result of the block

Raises:

  • (ArgumentError)

See Also:



725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
# File 'lib/pg/em.rb', line 725

def transaction
  raise ArgumentError, 'Must supply block for PG::EM::Client#transaction' unless block_given?
  tcount = @client_tran_count.to_i

  case transaction_status
  when PG::PQTRANS_IDLE
    # there is no transaction yet, so let's begin
    exec(TRAN_BEGIN_QUERY)
    # reset transaction count in case user code rolled it back before
    tcount = 0 if tcount != 0
  when PG::PQTRANS_INTRANS
    # transaction in progress, leave it be
  else
    # transaction failed, is in unknown state or command is active
    # in any case calling begin will raise server transaction error
    exec(TRAN_BEGIN_QUERY) # raises PG::InFailedSqlTransaction
  end
  # memoize nested count
  @client_tran_count = tcount + 1
  begin

    result = yield self

  rescue
    # error was raised
    case transaction_status
    when PG::PQTRANS_INTRANS, PG::PQTRANS_INERROR
      # do not rollback if transaction was rolled back before
      # or is in unknown state, which means connection reset is needed
      # and rollback only from the outermost transaction block
      exec(TRAN_ROLLBACK_QUERY) if tcount.zero?
    end
    # raise again
    raise
  else
    # we are good (but not out of woods yet)
    case transaction_status
    when PG::PQTRANS_INTRANS
      # commit only from the outermost transaction block
      exec(TRAN_COMMIT_QUERY) if tcount.zero?
    when PG::PQTRANS_INERROR
      # no ruby error was raised (or an error was rescued in code block)
      # but there was an sql error anyway
      # so rollback after the outermost block
      exec(TRAN_ROLLBACK_QUERY) if tcount.zero?
    when PG::PQTRANS_IDLE
      # the code block has terminated the transaction on its own
      # so just reset the counter
      tcount = 0
    else
      # something isn't right, so provoke an error just in case
      exec(TRAN_ROLLBACK_QUERY) if tcount.zero?
    end
    result
  ensure
    @client_tran_count = tcount
  end
end