Class: PgNotifier::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_notifier/manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attrs = {}) ⇒ Manager

Returns a new instance of Manager.



9
10
11
12
13
14
15
16
17
18
# File 'lib/pg_notifier/manager.rb', line 9

def initialize(attrs = {})
  @logger = attrs.fetch :logger, Logger.new(STDOUT)
  @db_config = attrs.fetch :db_config , {}
  @timeout = attrs.fetch :timeout, 0.1

  @finish = false
  Thread.abort_on_exception = true

  @connection_mutex = Mutex.new
end

Instance Attribute Details

#db_configObject

Returns the value of attribute db_config.



7
8
9
# File 'lib/pg_notifier/manager.rb', line 7

def db_config
  @db_config
end

#loggerObject

Returns the value of attribute logger.



7
8
9
# File 'lib/pg_notifier/manager.rb', line 7

def logger
  @logger
end

#timeoutObject

Returns the value of attribute timeout.



7
8
9
# File 'lib/pg_notifier/manager.rb', line 7

def timeout
  @timeout
end

Instance Method Details

#channelsObject



29
30
31
# File 'lib/pg_notifier/manager.rb', line 29

def channels
  subscriptions_by_channels.keys
end

#connectionObject



33
34
35
# File 'lib/pg_notifier/manager.rb', line 33

def connection
  @connection ||= PG::Connection.open db_config
end

#notify(channel, options = {}, &block) ⇒ Object



20
21
22
23
# File 'lib/pg_notifier/manager.rb', line 20

def notify(channel, options = {}, &block)
  subscriptions_by_channels[channel] ||= []
  subscriptions_by_channels[channel] << Subscription.new(channel, options, &block)
end

#runObject



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pg_notifier/manager.rb', line 37

def run
  logger.info "Starting pg_notifier for #{channels.count} channels: [ #{channels.join(' ')} ]"

  Thread.new do
    channels.each do |channel|
      pg_result = @connection_mutex.synchronize { connection.exec "LISTEN #{channel};" }

      unless pg_result.result_status.eql? PG::PGRES_COMMAND_OK
        raise ChannelNotLaunched, "Channel ##{channel} not launched"
      end
    end

    until @finish do
      if notification = wait_notification()
        logger.info "Notifying channel: %s, pid: %s, payload: %s" % notification

        subscriptions = subscriptions_by_channels.fetch notification.first, []
        subscriptions.each { |subscription| subscription.notify(*notification) }
      end
    end
  end
end

#shutdownObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/pg_notifier/manager.rb', line 60

def shutdown
  logger.info 'Shutting down'

  @finish = true

  @connection_mutex.synchronize do
    unless connection.finished?
      connection.async_exec "UNLISTEN *;"
      connection.finish
    end
  end

  exit(0)
end

#subscriptions_by_channelsObject



25
26
27
# File 'lib/pg_notifier/manager.rb', line 25

def subscriptions_by_channels
  @subscriptions_by_channels ||= {}
end