Class: PG::EM::Client
- Inherits:
-
Connection
- Object
- Connection
- PG::EM::Client
- Defined in:
- lib/pg/em.rb,
lib/pg/em/client/watcher.rb,
lib/pg/em/client/connect_watcher.rb
Overview
PostgreSQL EventMachine client
- Author
-
Rafal Michalski
Client is a PG::Connection wrapper designed for EventMachine.
The following new methods:
-
#exec_defer (alias:
query_defer
)
are added to execute queries asynchronously, returning Deferrable
object.
The following methods of PG::Connection are overloaded:
-
#exec (alias:
query
,async_exec
,async_query
)
and are now auto-detecting if EventMachine is running and performing commands asynchronously (blocking only current fiber) or calling parent thread-blocking methods.
If #async_autoreconnect option is set to true
, all of the above methods (in asynchronous mode) try to re-connect after a connection error occurs. It’s performed behind the scenes, so no error is raised, except if there was a transaction in progress. In such instance the error is raised after establishing connection to signal that the transaction was aborted.
If you want to detect auto re-connect event use #on_autoreconnect property/option.
To enable auto-reconnecting set:
client.async_autoreconnect = true
or pass as Client.new hash argument:
PG::EM::Client.new dbname: 'bar', async_autoreconnect: true
There are also new methods:
which are asynchronous versions of PG::Connection.new and PG:Connection#reset.
Additionally the following methods are overloaded:
-
Client.new (alias:
connect
,open
,setdb
,setdblogin
)
providing auto-detecting asynchronous (fiber-synchronized) or thread-blocking methods for (re)connecting.
Otherwise nothing changes in PG::Connection API. See PG::Connection docs for explanation of arguments to the above methods.
Warning:
#describe_prepared and #exec_prepared after #prepare should only be invoked on the same connection. If you are using a connection pool, make sure to acquire a single connection first.
Defined Under Namespace
Modules: ConnectWatcher, Watcher
Constant Summary collapse
- ROOT_FIBER =
Fiber.current
- DEFAULT_ASYNC_VARS =
{ :@async_autoreconnect => nil, :@connect_timeout => nil, :@query_timeout => 0, :@on_autoreconnect => nil, :@async_command_aborted => false, }.freeze
- TRAN_BEGIN_QUERY =
'BEGIN'
- TRAN_ROLLBACK_QUERY =
'ROLLBACK'
- TRAN_COMMIT_QUERY =
'COMMIT'
- @@connect_timeout_envvar =
environment variable name for connect_timeout fallback value
conndefaults.find{|d| d[:keyword] == "connect_timeout" }[:envvar]
Instance Attribute Summary collapse
-
#async_autoreconnect ⇒ Boolean
Enable/disable auto re-connect feature (
true
/false
). -
#connect_timeout ⇒ Float
Connection timeout.
-
#on_autoreconnect ⇒ Proc<Client, Error>
Proc that is called after a connection with the server has been automatically re-established.
-
#query_timeout ⇒ Float
Aborts async command processing if server response time exceedes
query_timeout
seconds.
Deferrable connection methods collapse
-
.connect_defer(*args) {|pg| ... } ⇒ FeaturedDeferrable
(also: async_connect)
Attempts to establish the connection asynchronously.
-
#reset_defer {|pg| ... } ⇒ FeaturedDeferrable
(also: #async_reset)
Attempts to reset the connection asynchronously.
Auto-sensing fiber-synchronized connection methods collapse
-
.new(*args, &blk) ⇒ Object
(also: connect, open, setdb, setdblogin)
Creates new instance of PG::EM::Client and attempts to establish connection.
-
#reset ⇒ Object
Attempts to reset the connection.
Deferrable command methods collapse
-
#describe_portal_defer(portal_name) {|result| ... } ⇒ FeaturedDeferrable
Asynchronously sends command to retrieve information about the portal
portal_name
, and immediately returns with deferrable. -
#describe_prepared_defer(statement_name) {|result| ... } ⇒ FeaturedDeferrable
Asynchronously sends command to retrieve information about the prepared statement
statement_name
, and immediately returns with deferrable. -
#exec_defer(sql, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable
(also: #query_defer, #async_query_defer, #async_exec_defer, #exec_params_defer)
Sends SQL query request specified by
sql
to PostgreSQL for asynchronous processing, and immediately returns withdeferrable
. -
#exec_prepared_defer(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable
Execute prepared named statement specified by
statement_name
asynchronously, and immediately returns with deferrable. -
#prepare_defer(stmt_name, sql, param_types = nil) {|result| ... } ⇒ FeaturedDeferrable
Prepares statement
sql
with namestmt_name
to be executed later asynchronously, and immediately returns with deferrable.
Auto-sensing thread or fiber blocking command methods collapse
-
#describe_portal(portal_name) {|result| ... } ⇒ PG::Result, Object
Retrieve information about the portal
portal_name
,. -
#describe_prepared(statement_name) {|result| ... } ⇒ PG::Result, Object
Retrieve information about the prepared statement
statement_name
,. -
#exec(sql) {|result| ... } ⇒ PG::Result, Object
(also: #query, #async_query, #async_exec)
Sends SQL query request specified by
sql
to PostgreSQL. -
#exec_params(sql, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object
Sends SQL query request specified by
sql
with optionalparams
andresult_format
to PostgreSQL. -
#exec_prepared(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object
Execute prepared named statement specified by
statement_name
. -
#prepare(stmt_name, sql, param_types = nil) {|result| ... } ⇒ PG::Result, Object
Prepares statement
sql
with namestmt_name
to be executed later.
Instance Method Summary collapse
-
#finish ⇒ Object
(also: #close)
Closes the backend connection.
-
#status ⇒ Number
Returns status of connection: PG::CONNECTION_OK or PG::CONNECTION_BAD.
-
#transaction {|client| ... } ⇒ Object
Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs.
Instance Attribute Details
#async_autoreconnect ⇒ Boolean
Enable/disable auto re-connect feature (true
/false
). Defaults to false
unless #on_autoreconnect is specified as an initialization option.
Changing #on_autoreconnect with accessor method doesn’t change the state of #async_autoreconnect.
You can also specify this as an option to new or connect_defer.
135 136 137 |
# File 'lib/pg/em.rb', line 135 def async_autoreconnect @async_autoreconnect end |
#connect_timeout ⇒ Float
Connection timeout. Affects #reset and #reset_defer.
Changing this property does not affect thread-blocking #reset.
However if passed as initialization option, it also affects blocking #reset.
To enable it set to some positive value. To disable it: set to 0. You can also specify this as an option to new or connect_defer.
113 114 115 |
# File 'lib/pg/em.rb', line 113 def connect_timeout @connect_timeout end |
#on_autoreconnect ⇒ Proc<Client, Error>
Proc that is called after a connection with the server has been automatically re-established. It’s being invoked just before the pending command is sent to the server.
The first argument it receives is the connection
instance. The second is the original exception
that caused the reconnecting process.
The proc can control the later action with its return value:
-
false
(explicitly,nil
is ignored) - the originalexception
is raised/passed back and the pending query command is not sent again to the server. -
true
(explicitly, truish values are ignored), the pending command is called regardless of the connection’s last transaction status. -
Exception object - is raised/passed back and the pending command is not sent.
-
Deferrable object - the chosen action will depend on the deferred status.
-
Other values are ignored and the pending query command is immediately sent to the server unless there was a pending transaction before the connection was reset.
It’s possible to execute queries from inside of the proc.
You may pass this proc as an option to new or connect_defer.
172 173 174 |
# File 'lib/pg/em.rb', line 172 def on_autoreconnect @on_autoreconnect end |
#query_timeout ⇒ Float
Aborts async command processing if server response time exceedes query_timeout
seconds. This does not apply to #reset and #reset_defer.
To enable it set to some positive value. To disable it: set to 0. You can also specify this as an option to new or connect_defer.
123 124 125 |
# File 'lib/pg/em.rb', line 123 def query_timeout @query_timeout end |
Class Method Details
.connect_defer(*args) {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_connect
Attempts to establish the connection asynchronously.
Pass the block to the returned deferrable’s callback
to obtain newly created and already connected PG::EM::Client object. In case of connection error errback
hook receives an error object as an argument. If the block is provided it’s bound to both callback
and errback
hooks of the returned deferrable.
Special PG::EM::Client options (e.g.: #async_autoreconnect) must be provided as connection_hash
argument variant. They will be ignored if passed as a connection_string
.
client_encoding
will be set according to Encoding.default_internal
.
243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/pg/em.rb', line 243 def self.connect_defer(*args, &blk) df = PG::EM::FeaturedDeferrable.new(&blk) async_args = (args) conn = df.protect { connect_start(*args) } if conn async_args.each {|k, v| conn.instance_variable_set(k, v) } ::EM.watch(conn.socket_io, ConnectWatcher, conn, df, false). poll_connection_and_check end df end |
.new(*args, &blk) ⇒ Object Also known as: connect, open, setdb, setdblogin
Creates new instance of PG::EM::Client and attempts to establish connection.
Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a thread-blocking call to the parent method.
Special PG::EM::Client options (e.g.: #async_autoreconnect) must be provided as connection_hash
argument variant. They will be ignored if passed as a connection_string
.
client_encoding
will be set according to Encoding.default_internal
.
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# File 'lib/pg/em.rb', line 339 def self.new(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) connect_defer(*args) {|r| f.resume(r) } conn = Fiber.yield raise conn if conn.is_a?(::Exception) if block_given? begin yield conn ensure conn.finish end else conn end else super(*args) end end |
Instance Method Details
#describe_portal(portal_name) {|result| ... } ⇒ PG::Result, Object
Retrieve information about the portal portal_name
,
Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to parent method.
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 |
# File 'lib/pg/em.rb', line 631 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) #{defer_name}(*args) do |res| f.resume(res) end result = Fiber.yield raise result if result.is_a?(::Exception) if block_given? begin yield result ensure result.clear end else result end else super end end EOD end |
#describe_portal_defer(portal_name) {|result| ... } ⇒ FeaturedDeferrable
Asynchronously sends command to retrieve information about the portal portal_name
, and immediately returns with deferrable.
Use the returned deferrable’s callback
and errback
method to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 |
# File 'lib/pg/em.rb', line 522 %w( exec_defer send_query prepare_defer send_prepare exec_prepared_defer send_query_prepared describe_prepared_defer send_describe_prepared describe_portal_defer send_describe_portal ).each_slice(2) do |defer_name, send_name| class_eval <<-EOD, __FILE__, __LINE__ def #{defer_name}(*args, &blk) df = PG::EM::FeaturedDeferrable.new(&blk) send_proc = proc do #{send_name}(*args) if @watcher && @watcher.watching? @watcher.watch_query(df, send_proc) else @watcher = ::EM.watch(self.socket_io, Watcher, self). watch_query(df, send_proc) end end begin if @async_command_aborted error = ConnectionBad.new("previous query expired, need connection reset") error.instance_variable_set(:@connection, self) raise error end @last_transaction_status = transaction_status send_proc.call rescue PG::Error => e ::EM.next_tick { async_autoreconnect!(df, e, &send_proc) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df end EOD end |
#describe_prepared(statement_name) {|result| ... } ⇒ PG::Result, Object
Retrieve information about the prepared statement statement_name
,
Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to parent method.
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 |
# File 'lib/pg/em.rb', line 631 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) #{defer_name}(*args) do |res| f.resume(res) end result = Fiber.yield raise result if result.is_a?(::Exception) if block_given? begin yield result ensure result.clear end else result end else super end end EOD end |
#describe_prepared_defer(statement_name) {|result| ... } ⇒ FeaturedDeferrable
Asynchronously sends command to retrieve information about the prepared statement statement_name
, and immediately returns with deferrable.
Use the returned deferrable’s callback
and errback
method to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 |
# File 'lib/pg/em.rb', line 522 %w( exec_defer send_query prepare_defer send_prepare exec_prepared_defer send_query_prepared describe_prepared_defer send_describe_prepared describe_portal_defer send_describe_portal ).each_slice(2) do |defer_name, send_name| class_eval <<-EOD, __FILE__, __LINE__ def #{defer_name}(*args, &blk) df = PG::EM::FeaturedDeferrable.new(&blk) send_proc = proc do #{send_name}(*args) if @watcher && @watcher.watching? @watcher.watch_query(df, send_proc) else @watcher = ::EM.watch(self.socket_io, Watcher, self). watch_query(df, send_proc) end end begin if @async_command_aborted error = ConnectionBad.new("previous query expired, need connection reset") error.instance_variable_set(:@connection, self) raise error end @last_transaction_status = transaction_status send_proc.call rescue PG::Error => e ::EM.next_tick { async_autoreconnect!(df, e, &send_proc) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df end EOD end |
#exec(sql) {|result| ... } ⇒ PG::Result, Object Also known as: query, async_query, async_exec
Sends SQL query request specified by sql
to PostgreSQL.
Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to parent method.
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 |
# File 'lib/pg/em.rb', line 631 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) #{defer_name}(*args) do |res| f.resume(res) end result = Fiber.yield raise result if result.is_a?(::Exception) if block_given? begin yield result ensure result.clear end else result end else super end end EOD end |
#exec_defer(sql, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable Also known as: query_defer, async_query_defer, async_exec_defer, exec_params_defer
Sends SQL query request specified by sql
to PostgreSQL for asynchronous processing, and immediately returns with deferrable
.
Use the returned deferrable’s callback
and errback
method to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 |
# File 'lib/pg/em.rb', line 522 %w( exec_defer send_query prepare_defer send_prepare exec_prepared_defer send_query_prepared describe_prepared_defer send_describe_prepared describe_portal_defer send_describe_portal ).each_slice(2) do |defer_name, send_name| class_eval <<-EOD, __FILE__, __LINE__ def #{defer_name}(*args, &blk) df = PG::EM::FeaturedDeferrable.new(&blk) send_proc = proc do #{send_name}(*args) if @watcher && @watcher.watching? @watcher.watch_query(df, send_proc) else @watcher = ::EM.watch(self.socket_io, Watcher, self). watch_query(df, send_proc) end end begin if @async_command_aborted error = ConnectionBad.new("previous query expired, need connection reset") error.instance_variable_set(:@connection, self) raise error end @last_transaction_status = transaction_status send_proc.call rescue PG::Error => e ::EM.next_tick { async_autoreconnect!(df, e, &send_proc) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df end EOD end |
#exec_params(sql, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object
Sends SQL query request specified by sql
with optional params
and result_format
to PostgreSQL.
Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to parent method.
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 |
# File 'lib/pg/em.rb', line 631 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) #{defer_name}(*args) do |res| f.resume(res) end result = Fiber.yield raise result if result.is_a?(::Exception) if block_given? begin yield result ensure result.clear end else result end else super end end EOD end |
#exec_prepared(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ PG::Result, Object
Execute prepared named statement specified by statement_name
.
Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to parent method.
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 |
# File 'lib/pg/em.rb', line 631 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) #{defer_name}(*args) do |res| f.resume(res) end result = Fiber.yield raise result if result.is_a?(::Exception) if block_given? begin yield result ensure result.clear end else result end else super end end EOD end |
#exec_prepared_defer(statement_name, params = nil, result_format = nil) {|result| ... } ⇒ FeaturedDeferrable
Execute prepared named statement specified by statement_name
asynchronously, and immediately returns with deferrable.
Use the returned deferrable’s callback
and errback
method to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 |
# File 'lib/pg/em.rb', line 522 %w( exec_defer send_query prepare_defer send_prepare exec_prepared_defer send_query_prepared describe_prepared_defer send_describe_prepared describe_portal_defer send_describe_portal ).each_slice(2) do |defer_name, send_name| class_eval <<-EOD, __FILE__, __LINE__ def #{defer_name}(*args, &blk) df = PG::EM::FeaturedDeferrable.new(&blk) send_proc = proc do #{send_name}(*args) if @watcher && @watcher.watching? @watcher.watch_query(df, send_proc) else @watcher = ::EM.watch(self.socket_io, Watcher, self). watch_query(df, send_proc) end end begin if @async_command_aborted error = ConnectionBad.new("previous query expired, need connection reset") error.instance_variable_set(:@connection, self) raise error end @last_transaction_status = transaction_status send_proc.call rescue PG::Error => e ::EM.next_tick { async_autoreconnect!(df, e, &send_proc) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df end EOD end |
#finish ⇒ Object Also known as: close
Closes the backend connection.
Detaches watch handler to prevent memory leak then calls parent PG::Connection#finish.
379 380 381 382 383 384 385 |
# File 'lib/pg/em.rb', line 379 def finish super if @watcher @watcher.detach if @watcher.watching? @watcher = nil end end |
#prepare(stmt_name, sql, param_types = nil) {|result| ... } ⇒ PG::Result, Object
Prepares statement sql
with name stmt_name
to be executed later.
Performs command asynchronously yielding current fiber if EventMachine reactor is running and the current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a blocking call to parent method.
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 |
# File 'lib/pg/em.rb', line 631 %w( exec exec_defer exec_params exec_defer exec_prepared exec_prepared_defer prepare prepare_defer describe_prepared describe_prepared_defer describe_portal describe_portal_defer ).each_slice(2) do |name, defer_name| class_eval <<-EOD, __FILE__, __LINE__ def #{name}(*args, &blk) if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) #{defer_name}(*args) do |res| f.resume(res) end result = Fiber.yield raise result if result.is_a?(::Exception) if block_given? begin yield result ensure result.clear end else result end else super end end EOD end |
#prepare_defer(stmt_name, sql, param_types = nil) {|result| ... } ⇒ FeaturedDeferrable
Prepares statement sql
with name stmt_name
to be executed later asynchronously, and immediately returns with deferrable.
Use the returned deferrable’s callback
and errback
method to get the result. If the block is provided it’s bound to both the callback
and errback
hooks of the returned deferrable.
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 |
# File 'lib/pg/em.rb', line 522 %w( exec_defer send_query prepare_defer send_prepare exec_prepared_defer send_query_prepared describe_prepared_defer send_describe_prepared describe_portal_defer send_describe_portal ).each_slice(2) do |defer_name, send_name| class_eval <<-EOD, __FILE__, __LINE__ def #{defer_name}(*args, &blk) df = PG::EM::FeaturedDeferrable.new(&blk) send_proc = proc do #{send_name}(*args) if @watcher && @watcher.watching? @watcher.watch_query(df, send_proc) else @watcher = ::EM.watch(self.socket_io, Watcher, self). watch_query(df, send_proc) end end begin if @async_command_aborted error = ConnectionBad.new("previous query expired, need connection reset") error.instance_variable_set(:@connection, self) raise error end @last_transaction_status = transaction_status send_proc.call rescue PG::Error => e ::EM.next_tick { async_autoreconnect!(df, e, &send_proc) } rescue Exception => e ::EM.next_tick { df.fail(e) } end df end EOD end |
#reset ⇒ Object
Attempts to reset the connection.
Performs command asynchronously yielding from current fiber if EventMachine reactor is running and current fiber isn’t the root fiber. Other fibers can process while waiting for the server to complete the request.
Otherwise performs a thread-blocking call to the parent method.
307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/pg/em.rb', line 307 def reset @async_command_aborted = false if ::EM.reactor_running? && !(f = Fiber.current).equal?(ROOT_FIBER) reset_defer {|r| f.resume(r) } conn = Fiber.yield raise conn if conn.is_a?(::Exception) conn else super end end |
#reset_defer {|pg| ... } ⇒ FeaturedDeferrable Also known as: async_reset
Attempts to reset the connection asynchronously.
Pass the block to the returned deferrable’s callback
to execute after successfull reset. If the block is provided it’s bound to callback
and errback
hooks of the returned deferrable.
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/pg/em.rb', line 270 def reset_defer(&blk) @async_command_aborted = false df = PG::EM::FeaturedDeferrable.new(&blk) # there can be only one watch handler over the socket # apparently eventmachine has hard time dealing with more than one # for blocking reset this is not needed if @watcher @watcher.detach if @watcher.watching? @watcher = nil end ret = df.protect(:fail) { reset_start } unless ret == :fail ::EM.watch(self.socket_io, ConnectWatcher, self, df, true). poll_connection_and_check end df end |
#status ⇒ Number
Returns status of connection: PG::CONNECTION_OK or PG::CONNECTION_BAD.
Returns PG::CONNECTION_BAD
for connections with async_command_aborted
flag set by expired query timeout. Otherwise return whatever PG::Connection#status returns.
395 396 397 398 399 400 401 |
# File 'lib/pg/em.rb', line 395 def status if @async_command_aborted PG::CONNECTION_BAD else super end end |
#transaction {|client| ... } ⇒ Object
Avoid using PG::EM::Client#*_defer calls inside the block or make sure all queries are completed before the provided block terminates.
Executes a BEGIN at the start of the block and a COMMIT at the end of the block or ROLLBACK if any exception occurs.
Calls to #transaction may be nested, however without sub-transactions (save points). If the innermost transaction block raises an error the transaction is rolled back to the state before the outermost transaction began.
This is an extension to the PG::Connection#transaction method as it does not support nesting in this way.
The method is sensitive to the transaction status and will safely rollback on any sql error even when it was catched by some rescue block. But consider that rescuing any sql error within an utility method is a bad idea.
This method works in both blocking/async modes (regardles of the reactor state) and is considered as a generic extension to the PG::Connection#transaction method.
725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 |
# File 'lib/pg/em.rb', line 725 def transaction raise ArgumentError, 'Must supply block for PG::EM::Client#transaction' unless block_given? tcount = @client_tran_count.to_i case transaction_status when PG::PQTRANS_IDLE # there is no transaction yet, so let's begin exec(TRAN_BEGIN_QUERY) # reset transaction count in case user code rolled it back before tcount = 0 if tcount != 0 when PG::PQTRANS_INTRANS # transaction in progress, leave it be else # transaction failed, is in unknown state or command is active # in any case calling begin will raise server transaction error exec(TRAN_BEGIN_QUERY) # raises PG::InFailedSqlTransaction end # memoize nested count @client_tran_count = tcount + 1 begin result = yield self rescue # error was raised case transaction_status when PG::PQTRANS_INTRANS, PG::PQTRANS_INERROR # do not rollback if transaction was rolled back before # or is in unknown state, which means connection reset is needed # and rollback only from the outermost transaction block exec(TRAN_ROLLBACK_QUERY) if tcount.zero? end # raise again raise else # we are good (but not out of woods yet) case transaction_status when PG::PQTRANS_INTRANS # commit only from the outermost transaction block exec(TRAN_COMMIT_QUERY) if tcount.zero? when PG::PQTRANS_INERROR # no ruby error was raised (or an error was rescued in code block) # but there was an sql error anyway # so rollback after the outermost block exec(TRAN_ROLLBACK_QUERY) if tcount.zero? when PG::PQTRANS_IDLE # the code block has terminated the transaction on its own # so just reset the counter tcount = 0 else # something isn't right, so provoke an error just in case exec(TRAN_ROLLBACK_QUERY) if tcount.zero? end result ensure @client_tran_count = tcount end end |