Module: CelluloidIOPGListener::Client

Included in:
Examples::Client
Defined in:
lib/celluloid-io-pg-listener/client.rb

Defined Under Namespace

Classes: InvalidClient

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



7
8
9
10
11
12
13
14
# File 'lib/celluloid-io-pg-listener/client.rb', line 7

def self.included(base)
  base.send(:include, Celluloid::IO)
  base.send(:include, Celluloid::Internals::Logger)
  # order of prepended modules is critical if they are enhancing
  #   the same method(s), and they are.
  base.prepend CelluloidIOPGListener::Initialization::AsyncListener
  base.prepend CelluloidIOPGListener::Initialization::ArgumentExtraction
end

Instance Method Details

#actionsObject



31
32
33
# File 'lib/celluloid-io-pg-listener/client.rb', line 31

def actions
  @actions ||= {}
end

#listen(channel, action) ⇒ Object



43
44
45
46
# File 'lib/celluloid-io-pg-listener/client.rb', line 43

def listen(channel, action)
  actions[channel] = action
  pg_connection.exec("LISTEN #{channel}")
end

#notify(channel, value) ⇒ Object



39
40
41
# File 'lib/celluloid-io-pg-listener/client.rb', line 39

def notify(channel, value)
  pg_connection.exec("NOTIFY #{channel}, '#{value}';")
end

#pg_connectionObject



35
36
37
# File 'lib/celluloid-io-pg-listener/client.rb', line 35

def pg_connection
  @pg_connection ||= PG.connect(conninfo_hash)
end

#start_listeningObject



48
49
50
51
52
53
54
# File 'lib/celluloid-io-pg-listener/client.rb', line 48

def start_listening
  @listening = true
  wait_for_notify do |channel, pid, payload|
    info "Received notification: #{[channel, pid, payload].inspect}"
    send(actions[channel], channel, payload)
  end
end

#stop_listeningObject



56
57
58
# File 'lib/celluloid-io-pg-listener/client.rb', line 56

def stop_listening
  @listening = false
end

#unlisten(channel) ⇒ Object



77
78
79
80
81
# File 'lib/celluloid-io-pg-listener/client.rb', line 77

def unlisten(channel)
  # (@listening ||= {})[channel] = false
  stop_listening # Not sure if there is a way to stop listening to a single channel without affecting the others.
  pg_connection.exec("UNLISTEN #{channel}")
end

#unlisten_wrapper(channel, payload, &block) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/celluloid-io-pg-listener/client.rb', line 16

def unlisten_wrapper(channel, payload, &block)
  if block_given?
    debug "Acting on payload: #{payload} on #{channel}"
    instance_eval(&block)
  else
    info "Not acting on payload: #{payload} on #{channel}"
  end
rescue => e
  info "#{self.class}##{callback_method} disconnected from #{channel} via #{e.class} #{e.message}"
  unlisten(channel)
  terminate
  # Rescue the error in a daemon-error-reporter to send to Airbrake or other reporting service?
  raise
end

#wait_for_notify(&block) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/celluloid-io-pg-listener/client.rb', line 60

def wait_for_notify(&block)
  io = pg_connection.socket_io

  while @listening do
    Celluloid::IO.wait_readable(io) # blocks execution, but unblocks this actor
    pg_connection.consume_input # fetch any input on this connection

    while notification = pg_connection.notifies do
      block.call(*[
                     notification[:relname], # channel
                     notification[:be_pid], # pid
                     notification[:extra] # payload
                 ])
    end
  end
end