Module: CelluloidIOPGListener::Client

Defined in:
lib/celluloid-io-pg-listener/client.rb

Defined Under Namespace

Classes: InvalidClient

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



7
8
9
10
11
12
13
14
# File 'lib/celluloid-io-pg-listener/client.rb', line 7

def self.included(base)
  base.send(:include, Celluloid::IO)
  base.send(:include, Celluloid::Internals::Logger)
  # order of prepended modules is critical if they are enhancing
  #   the same method(s), and they are.
  base.send(:prepend, CelluloidIOPGListener::Initialization::AsyncListener)
  base.send(:prepend, CelluloidIOPGListener::Initialization::ArgumentExtraction)
end

Instance Method Details

#actionsObject



28
29
30
# File 'lib/celluloid-io-pg-listener/client.rb', line 28

def actions
  @actions ||= {}
end

#initialize(*args) ⇒ Object

Defining initialize in a class including this module is optional,

unless you have custom args you need to handle
aside from those used by the CelluloidIOPGListener::Client

But if you do define it, use a splat,

hash or array splat should work,
depending on your signature needs.

With either splat, only pass the splat params to super,

and handle all your custom params locally.


25
26
# File 'lib/celluloid-io-pg-listener/client.rb', line 25

def initialize(*args)
end

#listen(channel, action) ⇒ Object

Supported channel names are any delimited (double-quoted) identifier: We supply the double quotes, you supply the contents. If you want unicode character code support submit a pull request to make the quote style ‘U&“` a config setting.

See: http://www.postgresql.org/docs/9.4/static/sql-syntax-lexical.html


40
41
42
43
# File 'lib/celluloid-io-pg-listener/client.rb', line 40

def listen(channel, action)
  actions[channel] = action
  pg_connection.exec(%[LISTEN "#{channel}";])
end

#pg_connectionObject



32
33
34
# File 'lib/celluloid-io-pg-listener/client.rb', line 32

def pg_connection
  @pg_connection ||= PG.connect(conninfo_hash)
end

#start_listeningObject



45
46
47
48
49
50
51
# File 'lib/celluloid-io-pg-listener/client.rb', line 45

def start_listening
  @listening = true
  wait_for_notify do |channel, pid, payload|
    info "Received notification: #{[channel, pid, payload].inspect}"
    send(actions[channel], channel, payload)
  end
end

#stop_listeningObject



53
54
55
# File 'lib/celluloid-io-pg-listener/client.rb', line 53

def stop_listening
  @listening = false
end

#wait_for_notify(&block) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/celluloid-io-pg-listener/client.rb', line 57

def wait_for_notify(&block)
  io = pg_connection.socket_io

  while @listening do
    Celluloid::IO.wait_readable(io) # blocks execution, but unblocks this actor
    pg_connection.consume_input # fetch any input on this connection

    while notification = pg_connection.notifies do
      block.call(*[
                     notification[:relname], # channel
                     notification[:be_pid], # pid
                     notification[:extra] # payload
                 ])
    end
  end
end