Class: PgNotifier::Manager
- Inherits:
-
Object
- Object
- PgNotifier::Manager
- Defined in:
- lib/pg_notifier/manager.rb
Instance Attribute Summary collapse
-
#db_config ⇒ Object
Returns the value of attribute db_config.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Instance Method Summary collapse
- #channels ⇒ Object
- #connection ⇒ Object
- #graceful_shutdown ⇒ Object
- #handle_signal(sig) ⇒ Object
-
#initialize(attrs = {}) ⇒ Manager
constructor
A new instance of Manager.
- #notify(channel, options = {}, &block) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #subscriptions_by_channels ⇒ Object
Constructor Details
#initialize(attrs = {}) ⇒ Manager
9 10 11 12 13 14 15 16 17 |
# File 'lib/pg_notifier/manager.rb', line 9 def initialize(attrs = {}) @logger = attrs.fetch :logger, Logger.new(STDOUT) @db_config = attrs.fetch :db_config , {} @timeout = attrs.fetch :timeout, 1 @finish = false @mutex = Mutex.new @resource = ConditionVariable.new end |
Instance Attribute Details
#db_config ⇒ Object
Returns the value of attribute db_config.
7 8 9 |
# File 'lib/pg_notifier/manager.rb', line 7 def db_config @db_config end |
#logger ⇒ Object
Returns the value of attribute logger.
7 8 9 |
# File 'lib/pg_notifier/manager.rb', line 7 def logger @logger end |
#timeout ⇒ Object
Returns the value of attribute timeout.
7 8 9 |
# File 'lib/pg_notifier/manager.rb', line 7 def timeout @timeout end |
Instance Method Details
#channels ⇒ Object
28 29 30 |
# File 'lib/pg_notifier/manager.rb', line 28 def channels subscriptions_by_channels.keys end |
#connection ⇒ Object
32 33 34 |
# File 'lib/pg_notifier/manager.rb', line 32 def connection @connection ||= PG::Connection.open db_config end |
#graceful_shutdown ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/pg_notifier/manager.rb', line 92 def graceful_shutdown logger.info 'Gracefully shutting down' @finish = true @mutex.synchronize do connection.finish unless connection.finished? @resource.signal end exit(0) end |
#handle_signal(sig) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/pg_notifier/manager.rb', line 70 def handle_signal(sig) logger.debug "Got #{sig} signal" case sig when 'INT' shutdown when 'TERM' graceful_shutdown when 'HUP' graceful_shutdown end end |
#notify(channel, options = {}, &block) ⇒ Object
19 20 21 22 |
# File 'lib/pg_notifier/manager.rb', line 19 def notify(channel, = {}, &block) subscriptions_by_channels[channel] ||= [] subscriptions_by_channels[channel] << Subscription.new(channel, , &block) end |
#run ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/pg_notifier/manager.rb', line 36 def run logger.info "Starting pg_notifier for #{channels.count} channels: [ #{channels.join(' ')} ]" sig_read, sig_write = IO.pipe (%w[INT TERM HUP] & Signal.list.keys).each do |sig| trap sig do sig_write.puts(sig) end end Thread.new do channels.each do |channel| pg_result = connection.exec "LISTEN #{channel};" end @mutex.synchronize do until @finish do connection.wait_for_notify do |channel, pid, payload| subscriptions = subscriptions_by_channels.fetch channel, [] subscriptions.each { |subscription| subscription.notify(channel, pid, payload) } end @resource.wait @mutex, timeout end end end while io = IO.select([sig_read]) sig = io.first[0].gets.chomp handle_signal(sig) end end |
#shutdown ⇒ Object
83 84 85 86 87 88 89 90 |
# File 'lib/pg_notifier/manager.rb', line 83 def shutdown logger.info 'Shutting down' @finish = true connection.finish unless connection.finished? exit(0) end |
#subscriptions_by_channels ⇒ Object
24 25 26 |
# File 'lib/pg_notifier/manager.rb', line 24 def subscriptions_by_channels @subscriptions_by_channels ||= {} end |