Module: CelluloidIOPGListener::Client
- Included in:
- Examples::Client
- Defined in:
- lib/celluloid-io-pg-listener/client.rb
Defined Under Namespace
Classes: InvalidClient
Class Method Summary collapse
Instance Method Summary collapse
- #actions ⇒ Object
- #listen(channel, action) ⇒ Object
- #notify(channel, value) ⇒ Object
- #pg_connection ⇒ Object
- #start_listening ⇒ Object
- #stop_listening ⇒ Object
- #unlisten(channel) ⇒ Object
- #unlisten_wrapper(channel, payload, &block) ⇒ Object
- #wait_for_notify(&block) ⇒ Object
Class Method Details
.included(base) ⇒ Object
7 8 9 10 11 12 13 14 |
# File 'lib/celluloid-io-pg-listener/client.rb', line 7 def self.included(base) base.send(:include, Celluloid::IO) base.send(:include, Celluloid::Internals::Logger) # order of prepended modules is critical if they are enhancing # the same method(s), and they are. base.prepend CelluloidIOPGListener::Initialization::AsyncListener base.prepend CelluloidIOPGListener::Initialization::ArgumentExtraction end |
Instance Method Details
#actions ⇒ Object
31 32 33 |
# File 'lib/celluloid-io-pg-listener/client.rb', line 31 def actions @actions ||= {} end |
#listen(channel, action) ⇒ Object
43 44 45 46 |
# File 'lib/celluloid-io-pg-listener/client.rb', line 43 def listen(channel, action) actions[channel] = action pg_connection.exec("LISTEN #{channel}") end |
#notify(channel, value) ⇒ Object
39 40 41 |
# File 'lib/celluloid-io-pg-listener/client.rb', line 39 def notify(channel, value) pg_connection.exec("NOTIFY #{channel}, '#{value}';") end |
#pg_connection ⇒ Object
35 36 37 |
# File 'lib/celluloid-io-pg-listener/client.rb', line 35 def pg_connection @pg_connection ||= PG.connect(conninfo_hash) end |
#start_listening ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/celluloid-io-pg-listener/client.rb', line 48 def start_listening @listening = true wait_for_notify do |channel, pid, payload| info "Received notification: #{[channel, pid, payload].inspect}" send(actions[channel], channel, payload) end end |
#stop_listening ⇒ Object
56 57 58 |
# File 'lib/celluloid-io-pg-listener/client.rb', line 56 def stop_listening @listening = false end |
#unlisten(channel) ⇒ Object
77 78 79 80 81 |
# File 'lib/celluloid-io-pg-listener/client.rb', line 77 def unlisten(channel) # (@listening ||= {})[channel] = false stop_listening # Not sure if there is a way to stop listening to a single channel without affecting the others. pg_connection.exec("UNLISTEN #{channel}") end |
#unlisten_wrapper(channel, payload, &block) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/celluloid-io-pg-listener/client.rb', line 16 def unlisten_wrapper(channel, payload, &block) if block_given? debug "Acting on payload: #{payload} on #{channel}" instance_eval(&block) else info "Not acting on payload: #{payload} on #{channel}" end rescue => e info "#{self.class}##{callback_method} disconnected from #{channel} via #{e.class} #{e.}" unlisten(channel) terminate # Rescue the error in a daemon-error-reporter to send to Airbrake or other reporting service? raise end |
#wait_for_notify(&block) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/celluloid-io-pg-listener/client.rb', line 60 def wait_for_notify(&block) io = pg_connection.socket_io while @listening do Celluloid::IO.wait_readable(io) # blocks execution, but unblocks this actor pg_connection.consume_input # fetch any input on this connection while notification = pg_connection.notifies do block.call(*[ notification[:relname], # channel notification[:be_pid], # pid notification[:extra] # payload ]) end end end |