Module: CelluloidIOPGListener::Client

Included in:
Listener
Defined in:
lib/celluloid-io-pg-listener/client.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



5
6
7
8
9
# File 'lib/celluloid-io-pg-listener/client.rb', line 5

def self.included(base)
  base.send(:include, Celluloid)
  base.send(:include, Celluloid::IO)
  base.send(:include, Celluloid::Internals::Logger)
end

Instance Method Details

#actionsObject



20
21
22
# File 'lib/celluloid-io-pg-listener/client.rb', line 20

def actions
  @actions ||= {}
end

#listen(channel, action) ⇒ Object



32
33
34
35
# File 'lib/celluloid-io-pg-listener/client.rb', line 32

def listen(channel, action)
  actions[channel] = action
  pg_connection.exec("LISTEN #{channel}")
end

#notify(channel, value) ⇒ Object



28
29
30
# File 'lib/celluloid-io-pg-listener/client.rb', line 28

def notify(channel, value)
  pg_connection.exec("NOTIFY #{channel}, '#{value}';")
end

#pg_connectionObject



24
25
26
# File 'lib/celluloid-io-pg-listener/client.rb', line 24

def pg_connection
  @pg_connection ||= PG.connect( dbname: @dbname )
end

#start_listeningObject



37
38
39
40
41
42
43
44
45
46
# File 'lib/celluloid-io-pg-listener/client.rb', line 37

def start_listening
  info "Starting Listening"

  @listening = true

  wait_for_notify do |channel, pid, payload|
    info "Received notification: #{[channel, pid, payload].inspect}"
    send(actions[channel], channel, payload)
  end
end

#stop_listeningObject



48
49
50
# File 'lib/celluloid-io-pg-listener/client.rb', line 48

def stop_listening
  @listening = false
end

#unlisten(channel) ⇒ Object



69
70
71
72
73
# File 'lib/celluloid-io-pg-listener/client.rb', line 69

def unlisten(channel)
  # (@listening ||= {})[channel] = false
  stop_listening # Not sure if there is a way to stop listening to a single channel without affecting the others.
  pg_connection.exec("UNLISTEN #{channel}")
end

#unlisten_wrapper(channel, payload, &block) ⇒ Object



11
12
13
14
15
16
17
18
# File 'lib/celluloid-io-pg-listener/client.rb', line 11

def unlisten_wrapper(channel, payload, &block)
  info "Doing Something with Payload: #{payload} on #{channel}"
  instance_eval(&block)
rescue => e
  info "#{self.class} disconnected from #{channel} via #{e.class} #{e.message}"
  unlisten(channel)
  terminate
end

#wait_for_notify(&block) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/celluloid-io-pg-listener/client.rb', line 52

def wait_for_notify(&block)
  io = pg_connection.socket_io

  while @listening do
    Celluloid::IO.wait_readable(io) # blocks execution, but unblocks this actor
    pg_connection.consume_input # fetch any input on this connection

    while notification = pg_connection.notifies do
      block.call(*[
                     notification[:relname], # channel
                     notification[:be_pid], # pid
                     notification[:extra] # payload
                 ])
    end
  end
end