Class: PgNotifier::Manager
- Inherits:
-
Object
- Object
- PgNotifier::Manager
- Defined in:
- lib/pg_notifier/manager.rb
Instance Attribute Summary collapse
-
#db_config ⇒ Object
Returns the value of attribute db_config.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Instance Method Summary collapse
- #channels ⇒ Object
- #connection ⇒ Object
-
#initialize(attrs = {}) ⇒ Manager
constructor
A new instance of Manager.
- #notify(channel, options = {}, &block) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #subscriptions_by_channels ⇒ Object
Constructor Details
#initialize(attrs = {}) ⇒ Manager
Returns a new instance of Manager.
9 10 11 12 13 14 15 16 17 18 |
# File 'lib/pg_notifier/manager.rb', line 9 def initialize(attrs = {}) @logger = attrs.fetch :logger, Logger.new(STDOUT) @db_config = attrs.fetch :db_config , {} @timeout = attrs.fetch :timeout, 0.1 @finish = false Thread.abort_on_exception = true @connection_mutex = Mutex.new end |
Instance Attribute Details
#db_config ⇒ Object
Returns the value of attribute db_config.
7 8 9 |
# File 'lib/pg_notifier/manager.rb', line 7 def db_config @db_config end |
#logger ⇒ Object
Returns the value of attribute logger.
7 8 9 |
# File 'lib/pg_notifier/manager.rb', line 7 def logger @logger end |
#timeout ⇒ Object
Returns the value of attribute timeout.
7 8 9 |
# File 'lib/pg_notifier/manager.rb', line 7 def timeout @timeout end |
Instance Method Details
#channels ⇒ Object
29 30 31 |
# File 'lib/pg_notifier/manager.rb', line 29 def channels subscriptions_by_channels.keys end |
#connection ⇒ Object
33 34 35 |
# File 'lib/pg_notifier/manager.rb', line 33 def connection @connection ||= PG::Connection.open db_config end |
#notify(channel, options = {}, &block) ⇒ Object
20 21 22 23 |
# File 'lib/pg_notifier/manager.rb', line 20 def notify(channel, = {}, &block) subscriptions_by_channels[channel] ||= [] subscriptions_by_channels[channel] << Subscription.new(channel, , &block) end |
#run ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/pg_notifier/manager.rb', line 37 def run logger.info "Starting pg_notifier for #{channels.count} channels: [ #{channels.join(' ')} ]" Thread.new do channels.each do |channel| pg_result = @connection_mutex.synchronize { connection.exec "LISTEN #{channel};" } unless pg_result.result_status.eql? PG::PGRES_COMMAND_OK raise ChannelNotLaunched, "Channel ##{channel} not launched" end end until @finish do if notification = wait_notification() logger.info "Notifying channel: %s, pid: %s, payload: %s" % notification subscriptions = subscriptions_by_channels.fetch notification.first, [] subscriptions.each { |subscription| subscription.notify(*notification) } end end end end |
#shutdown ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/pg_notifier/manager.rb', line 60 def shutdown logger.info 'Shutting down' @finish = true @connection_mutex.synchronize do unless connection.finished? connection.async_exec "UNLISTEN *;" connection.finish end end exit(0) end |
#subscriptions_by_channels ⇒ Object
25 26 27 |
# File 'lib/pg_notifier/manager.rb', line 25 def subscriptions_by_channels @subscriptions_by_channels ||= {} end |