Class: PG::EM::Client

Inherits:
Connection
  • Object
show all
Defined in:
lib/pg/em.rb,
lib/em-synchrony/pg.rb

Overview

PostgreSQL EventMachine client

Author

Rafal Michalski ([email protected])

Licence

MIT License

PG::EM::Client is a wrapper for PG::Connection which (re)defines methods:

  • async_exec (alias: async_query)

  • async_prepare

  • async_exec_prepared

  • async_describe_prepared

  • async_describe_portal

which are suitable to run in EM event loop (they return Deferrable)

and following:

  • exec (alias: query)

  • prepare

  • exec_prepared

  • describe_prepared

  • describe_portal

autodetecting if EventMachine is running and using the appropriate (async or sync) method version.

Additionally to the above, there are asynchronous methods defined for establishing connection and re-connecting:

  • Client.async_connect

  • async_reset

They are async equivalents of PG::Connection.connect (which is also aliased by PG::Connection as new, open, setdb, setdblogin) and reset.

When #async_autoreconnect is true, async methods might try to re-connect after a connection error. You won’t even notice that (except for warning message from PG). If you want to detect such event use #on_autoreconnect property.

To enable auto-reconnecting set:

client.async_autoreconnect = true

or pass as new() hash argument:

::new database: 'bar', async_autoreconnect: true

Otherwise nothing changes in PG::Connection API. See PG::Connection docs for arguments to above methods.

Warning:

describe_prepared and exec_prepared after prepare should only be invoked on the same connection. If you are using a connection pool, make sure to acquire single connection first.

Defined Under Namespace

Modules: ConnectWatcher, Watcher

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Client

Creates new instance of PG::EM::Client and attempts to establish connection. See PG::Connection.new.

Special PG::EM::Client options (e.g.: async_autoreconnect) must be provided as connection_hash argument variant. They will be ignored in connection_string.

em-synchrony version will do set client_encoding for you according to Encoding.default_internal.



395
396
397
398
# File 'lib/pg/em.rb', line 395

def initialize(*args)
  Client.parse_async_args(*args).each {|k, v| self.instance_variable_set(k, v) }
  super(*args)
end

Instance Attribute Details

#async_autoreconnectObject

Enable/disable auto-reconnect feature (true/false). Defaults to false. However it is implicitly set to true if #on_autoreconnect is specified as initialization option. Changing #on_autoreconnect with accessor method doesn’t change #async_autoreconnect.



152
153
154
# File 'lib/pg/em.rb', line 152

def async_autoreconnect
  @async_autoreconnect
end

#async_command_abortedObject

Used internally for marking connection as aborted on query timeout.



183
184
185
# File 'lib/pg/em.rb', line 183

def async_command_aborted
  @async_command_aborted
end

#connect_timeoutObject

Connection timeout. Changing this property only affects ::async_connect and #async_reset. However if passed as initialization option, it also affects blocking ::new and #reset.



136
137
138
# File 'lib/pg/em.rb', line 136

def connect_timeout
  @connect_timeout
end

#on_autoreconnectObject

on_autoreconnect is a user defined Proc that is called after a connection with the server has been re-established. It’s invoked with two arguments. First one is the connection. The second is the original exception that caused the reconnecting process.

Certain rules should apply to #on_autoreconnect proc:

  • If proc returns false (explicitly, nil is ignored), the original exception is passed to Defferable’s errback and the send query command is not invoked at all.

  • If return value is an instance of exception, it is passed to Defferable’s errback and the send query command is not invoked at all.

  • If return value responds to callback and errback methods, the send query command will be bound to value’s success callback and the original Defferable’s errback or value’s errback.

  • Other return values are ignored and the send query command is called immediately after #on_autoreconnect proc is executed.

You may pass this proc as :on_autoreconnect option to ::new.

Example:

pg.on_autoreconnect = proc do |conn, ex|
  conn.prepare("species_by_name", 
   "select id, name from animals where species=$1 order by name")
end


180
181
182
# File 'lib/pg/em.rb', line 180

def on_autoreconnect
  @on_autoreconnect
end

#query_timeoutObject

Aborts async command processing if waiting for response from server exceedes query_timeout seconds. This does not apply to ::async_connect and #async_reset. For them use connect_timeout instead.

To enable it set to seconds (> 0). To disable: set to 0. You can also specify this as initialization option.



145
146
147
# File 'lib/pg/em.rb', line 145

def query_timeout
  @query_timeout
end

Class Method Details

.async_connect(*args, &blk) ⇒ Object

Attempts to establish the connection asynchronously. For args see PG::Connection.new. Returns Deferrable. Use its callback to obtain newly created and already connected PG::EM::Client object. If block is provided, it’s bound to callback and errback of returned Deferrable.

Special PG::EM::Client options (e.g.: async_autoreconnect) must be provided as connection_hash argument variant. They will be ignored in connection_string.

client_encoding will be set for you according to Encoding.default_internal.



354
355
356
357
358
359
360
361
362
363
# File 'lib/pg/em.rb', line 354

def self.async_connect(*args, &blk)
  df = PG::EM::FeaturedDeferrable.new(&blk)
  async_args = parse_async_args(*args)
  conn = df.protect { connect_start(*args) }
  if conn
    async_args.each {|k, v| conn.instance_variable_set(k, v) }
    ::EM.watch(conn.socket, ConnectWatcher, conn, df, :connect).poll_connection_and_check
  end
  df
end

.parse_async_args(*args) ⇒ Object



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/pg/em.rb', line 308

def self.parse_async_args(*args)
  async_args = {
    :@async_autoreconnect => nil,
    :@connect_timeout => 0,
    :@query_timeout => 0,
    :@on_autoreconnect => nil,
    :@async_command_aborted => false,
  }
  if args.last.is_a? Hash
    args.last.reject! do |key, value|
      case key.to_s
      when 'async_autoreconnect'
        async_args[:@async_autoreconnect] = !!value
        true
      when 'on_reconnect'
        raise ArgumentError.new("on_reconnect is no longer supported, use on_autoreconnect")
      when 'on_autoreconnect'
        if value.respond_to? :call
          async_args[:@on_autoreconnect] = value
          async_args[:@async_autoreconnect] = true if async_args[:@async_autoreconnect].nil?
        end
        true
      when 'connect_timeout'
        async_args[:@connect_timeout] = value.to_f
        false
      when 'query_timeout'
        async_args[:@query_timeout] = value.to_f
        true
      end
    end
  end
  async_args[:@async_autoreconnect] = false if async_args[:@async_autoreconnect].nil?
  async_args
end

Instance Method Details

#async_autoreconnect!(deferrable, error, &send_proc) ⇒ Object

Perform autoreconnect. Used internally.



412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'lib/pg/em.rb', line 412

def async_autoreconnect!(deferrable, error, &send_proc)
  if async_autoreconnect && self.status != PG::CONNECTION_OK
    reset_df = async_reset
    reset_df.errback { |ex| deferrable.fail(ex) }
    reset_df.callback do
      if on_autoreconnect
        returned_df = on_autoreconnect.call(self, error)
        if returned_df == false
          ::EM.next_tick { deferrable.fail(error) }
        elsif returned_df.respond_to?(:callback) && returned_df.respond_to?(:errback)
          returned_df.callback { deferrable.protect(&send_proc) }
          returned_df.errback { |ex| deferrable.fail(ex) }
        elsif returned_df.is_a?(Exception)
          ::EM.next_tick { deferrable.fail(returned_df) }
        else
          deferrable.protect(&send_proc)
        end
      else
        deferrable.protect(&send_proc)
      end
    end
  else
    ::EM.next_tick { deferrable.fail(error) }
  end
end

#async_reset(&blk) ⇒ Object

Attempts to reset the connection asynchronously. There are no arguments, except block argument.

Returns Deferrable. Use it’s callback to handle success. If block is provided, it’s bound to callback and errback of returned Deferrable.



371
372
373
374
375
376
377
378
379
# File 'lib/pg/em.rb', line 371

def async_reset(&blk)
  @async_command_aborted = false
  df = PG::EM::FeaturedDeferrable.new(&blk)
  ret = df.protect(:fail) { reset_start }
  unless ret == :fail
    ::EM.watch(self.socket, ConnectWatcher, self, df, :reset).poll_connection_and_check
  end
  df
end

#resetObject

Uncheck #async_command_aborted on blocking reset.



382
383
384
385
# File 'lib/pg/em.rb', line 382

def reset
  @async_command_aborted = false
  super
end

#statusObject

Return CONNECTION_BAD for connections with async_command_aborted flag set by expired query timeout. Otherwise return whatever PG::Connection#status provides.



403
404
405
406
407
408
409
# File 'lib/pg/em.rb', line 403

def status
  if @async_command_aborted
    PG::CONNECTION_BAD
  else
    super
  end
end