Class: PG::EM::Client
- Inherits:
-
Connection
- Object
- Connection
- PG::EM::Client
- Defined in:
- lib/pg/em.rb,
lib/em-synchrony/pg.rb
Overview
PostgreSQL EventMachine client
- Author
-
Rafal Michalski ([email protected])
- Licence
-
MIT License
PG::EM::Client is a wrapper for PG::Connection which (re)defines methods:
-
async_exec
(alias:async_query
) -
async_prepare
-
async_exec_prepared
-
async_describe_prepared
-
async_describe_portal
which are suitable to run in EM event loop (they return Deferrable
)
and following:
-
exec
(alias:query
) -
prepare
-
exec_prepared
-
describe_prepared
-
describe_portal
autodetecting if EventMachine is running and using the appropriate (async or sync) method version.
Additionally to the above, there are asynchronous methods defined for establishing connection and re-connecting:
-
Client.async_connect
-
async_reset
They are async equivalents of PG::Connection.connect (which is also aliased by PG::Connection as new
, open
, setdb
, setdblogin
) and reset
.
When #async_autoreconnect is true
, async methods might try to re-connect after a connection error. You won’t even notice that (except for warning message from PG). If you want to detect such event use #on_autoreconnect property.
To enable auto-reconnecting set:
client.async_autoreconnect = true
or pass as new() hash argument:
::new database: 'bar', async_autoreconnect: true
Otherwise nothing changes in PG::Connection API. See PG::Connection docs for arguments to above methods.
Warning:
describe_prepared
and exec_prepared
after prepare
should only be invoked on the same connection. If you are using a connection pool, make sure to acquire single connection first.
Defined Under Namespace
Modules: ConnectWatcher, Watcher
Instance Attribute Summary collapse
-
#async_autoreconnect ⇒ Object
Enable/disable auto-reconnect feature (
true
/false
). -
#async_command_aborted ⇒ Object
Used internally for marking connection as aborted on query timeout.
-
#connect_timeout ⇒ Object
Connection timeout.
-
#on_autoreconnect ⇒ Object
on_autoreconnect
is a user defined Proc that is called after a connection with the server has been re-established. -
#query_timeout ⇒ Object
Aborts async command processing if waiting for response from server exceedes
query_timeout
seconds.
Class Method Summary collapse
-
.async_connect(*args, &blk) ⇒ Object
Attempts to establish the connection asynchronously.
- .parse_async_args(*args) ⇒ Object
Instance Method Summary collapse
-
#async_autoreconnect!(deferrable, error, &send_proc) ⇒ Object
Perform autoreconnect.
-
#async_reset(&blk) ⇒ Object
Attempts to reset the connection asynchronously.
-
#initialize(*args) ⇒ Client
constructor
Creates new instance of PG::EM::Client and attempts to establish connection.
-
#reset ⇒ Object
Uncheck #async_command_aborted on blocking reset.
-
#status ⇒ Object
Return
CONNECTION_BAD
for connections withasync_command_aborted
flag set by expired query timeout.
Constructor Details
#initialize(*args) ⇒ Client
Creates new instance of PG::EM::Client and attempts to establish connection. See PG::Connection.new.
Special PG::EM::Client options (e.g.: async_autoreconnect
) must be provided as connection_hash
argument variant. They will be ignored in connection_string
.
em-synchrony
version will do set client_encoding
for you according to Encoding.default_internal.
395 396 397 398 |
# File 'lib/pg/em.rb', line 395 def initialize(*args) Client.parse_async_args(*args).each {|k, v| self.instance_variable_set(k, v) } super(*args) end |
Instance Attribute Details
#async_autoreconnect ⇒ Object
Enable/disable auto-reconnect feature (true
/false
). Defaults to false
. However it is implicitly set to true
if #on_autoreconnect is specified as initialization option. Changing #on_autoreconnect with accessor method doesn’t change #async_autoreconnect.
152 153 154 |
# File 'lib/pg/em.rb', line 152 def async_autoreconnect @async_autoreconnect end |
#async_command_aborted ⇒ Object
Used internally for marking connection as aborted on query timeout.
183 184 185 |
# File 'lib/pg/em.rb', line 183 def async_command_aborted @async_command_aborted end |
#connect_timeout ⇒ Object
Connection timeout. Changing this property only affects ::async_connect and #async_reset. However if passed as initialization option, it also affects blocking ::new and #reset.
136 137 138 |
# File 'lib/pg/em.rb', line 136 def connect_timeout @connect_timeout end |
#on_autoreconnect ⇒ Object
on_autoreconnect
is a user defined Proc that is called after a connection with the server has been re-established. It’s invoked with two arguments. First one is the connection
. The second is the original exception
that caused the reconnecting process.
Certain rules should apply to #on_autoreconnect proc:
-
If proc returns
false
(explicitly,nil
is ignored), the originalexception
is passed to Defferable’serrback
and the send query command is not invoked at all. -
If return value is an instance of exception, it is passed to Defferable’s
errback
and the send query command is not invoked at all. -
If return value responds to
callback
anderrback
methods, the send query command will be bound to value’s successcallback
and the original Defferable’serrback
or value’serrback
. -
Other return values are ignored and the send query command is called immediately after #on_autoreconnect proc is executed.
You may pass this proc as :on_autoreconnect
option to ::new.
Example:
pg.on_autoreconnect = proc do |conn, ex|
conn.prepare("species_by_name",
"select id, name from animals where species=$1 order by name")
end
180 181 182 |
# File 'lib/pg/em.rb', line 180 def on_autoreconnect @on_autoreconnect end |
#query_timeout ⇒ Object
Aborts async command processing if waiting for response from server exceedes query_timeout
seconds. This does not apply to ::async_connect and #async_reset. For them use connect_timeout
instead.
To enable it set to seconds (> 0). To disable: set to 0. You can also specify this as initialization option.
145 146 147 |
# File 'lib/pg/em.rb', line 145 def query_timeout @query_timeout end |
Class Method Details
.async_connect(*args, &blk) ⇒ Object
Attempts to establish the connection asynchronously. For args see PG::Connection.new. Returns Deferrable
. Use its callback
to obtain newly created and already connected PG::EM::Client object. If block is provided, it’s bound to callback
and errback
of returned Deferrable
.
Special PG::EM::Client options (e.g.: async_autoreconnect
) must be provided as connection_hash
argument variant. They will be ignored in connection_string
.
client_encoding
will be set for you according to Encoding.default_internal.
354 355 356 357 358 359 360 361 362 363 |
# File 'lib/pg/em.rb', line 354 def self.async_connect(*args, &blk) df = PG::EM::FeaturedDeferrable.new(&blk) async_args = parse_async_args(*args) conn = df.protect { connect_start(*args) } if conn async_args.each {|k, v| conn.instance_variable_set(k, v) } ::EM.watch(conn.socket, ConnectWatcher, conn, df, :connect).poll_connection_and_check end df end |
.parse_async_args(*args) ⇒ Object
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/pg/em.rb', line 308 def self.parse_async_args(*args) async_args = { :@async_autoreconnect => nil, :@connect_timeout => 0, :@query_timeout => 0, :@on_autoreconnect => nil, :@async_command_aborted => false, } if args.last.is_a? Hash args.last.reject! do |key, value| case key.to_s when 'async_autoreconnect' async_args[:@async_autoreconnect] = !!value true when 'on_reconnect' raise ArgumentError.new("on_reconnect is no longer supported, use on_autoreconnect") when 'on_autoreconnect' if value.respond_to? :call async_args[:@on_autoreconnect] = value async_args[:@async_autoreconnect] = true if async_args[:@async_autoreconnect].nil? end true when 'connect_timeout' async_args[:@connect_timeout] = value.to_f false when 'query_timeout' async_args[:@query_timeout] = value.to_f true end end end async_args[:@async_autoreconnect] = false if async_args[:@async_autoreconnect].nil? async_args end |
Instance Method Details
#async_autoreconnect!(deferrable, error, &send_proc) ⇒ Object
Perform autoreconnect. Used internally.
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'lib/pg/em.rb', line 412 def async_autoreconnect!(deferrable, error, &send_proc) if async_autoreconnect && self.status != PG::CONNECTION_OK reset_df = async_reset reset_df.errback { |ex| deferrable.fail(ex) } reset_df.callback do if on_autoreconnect returned_df = on_autoreconnect.call(self, error) if returned_df == false ::EM.next_tick { deferrable.fail(error) } elsif returned_df.respond_to?(:callback) && returned_df.respond_to?(:errback) returned_df.callback { deferrable.protect(&send_proc) } returned_df.errback { |ex| deferrable.fail(ex) } elsif returned_df.is_a?(Exception) ::EM.next_tick { deferrable.fail(returned_df) } else deferrable.protect(&send_proc) end else deferrable.protect(&send_proc) end end else ::EM.next_tick { deferrable.fail(error) } end end |
#async_reset(&blk) ⇒ Object
Attempts to reset the connection asynchronously. There are no arguments, except block argument.
Returns Deferrable
. Use it’s callback
to handle success. If block is provided, it’s bound to callback
and errback
of returned Deferrable
.
371 372 373 374 375 376 377 378 379 |
# File 'lib/pg/em.rb', line 371 def async_reset(&blk) @async_command_aborted = false df = PG::EM::FeaturedDeferrable.new(&blk) ret = df.protect(:fail) { reset_start } unless ret == :fail ::EM.watch(self.socket, ConnectWatcher, self, df, :reset).poll_connection_and_check end df end |
#reset ⇒ Object
Uncheck #async_command_aborted on blocking reset.
382 383 384 385 |
# File 'lib/pg/em.rb', line 382 def reset @async_command_aborted = false super end |
#status ⇒ Object
Return CONNECTION_BAD
for connections with async_command_aborted
flag set by expired query timeout. Otherwise return whatever PG::Connection#status provides.
403 404 405 406 407 408 409 |
# File 'lib/pg/em.rb', line 403 def status if @async_command_aborted PG::CONNECTION_BAD else super end end |