Class: EventMachine::Hiredis::PubsubClient
- Inherits:
-
BaseClient
- Object
- BaseClient
- EventMachine::Hiredis::PubsubClient
- Defined in:
- lib/em-hiredis/pubsub_client.rb
Constant Summary collapse
- PUBSUB_MESSAGES =
%w{message pmessage subscribe unsubscribe psubscribe punsubscribe}.freeze
- PING_CHANNEL =
'__em-hiredis-ping'
Instance Attribute Summary
Attributes inherited from BaseClient
Instance Method Summary collapse
- #connect ⇒ Object
-
#initialize(host = 'localhost', port = '6379', password = nil, db = nil, tls = false) ⇒ PubsubClient
constructor
A new instance of PubsubClient.
-
#ping ⇒ Object
Pubsub connections to not support even the PING command, but it is useful, especially with read-only connections like pubsub, to be able to check that the TCP connection is still usefully alive.
-
#psubscribe(pattern, proc = nil, &block) ⇒ Deferrable
Pattern subscribe to a pubsub channel.
-
#punsubscribe(pattern) ⇒ Deferrable
Pattern unsubscribe all callbacks for a given pattern.
-
#punsubscribe_proc(pattern, proc) ⇒ Deferrable
Unsubscribe a given callback from a pattern.
-
#subscribe(channel, proc = nil, &block) ⇒ Deferrable
Subscribe to a pubsub channel.
-
#unsubscribe(channel) ⇒ Deferrable
Unsubscribe all callbacks for a given channel.
-
#unsubscribe_proc(channel, proc) ⇒ Deferrable
Unsubscribe a given callback from a channel.
Methods inherited from BaseClient
#auth, #close_connection, #configure, #configure_inactivity_check, #connected?, #pending_commands?, #reconnect!, #reconnect_connection, #select
Methods included from EventEmitter
#emit, #listeners, #on, #remove_all_listeners, #remove_listener
Constructor Details
#initialize(host = 'localhost', port = '6379', password = nil, db = nil, tls = false) ⇒ PubsubClient
Returns a new instance of PubsubClient.
7 8 9 10 11 12 |
# File 'lib/em-hiredis/pubsub_client.rb', line 7 def initialize(host='localhost', port='6379', password=nil, db=nil, tls=false) @subs, @psubs = [], [] @tls = tls @pubsub_defs = Hash.new { |h,k| h[k] = [] } super end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method in the class EventMachine::Hiredis::BaseClient
Instance Method Details
#connect ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/em-hiredis/pubsub_client.rb', line 14 def connect @sub_callbacks = Hash.new { |h, k| h[k] = [] } @psub_callbacks = Hash.new { |h, k| h[k] = [] } # Resubsubscribe to channels on reconnect on(:reconnected) { raw_send_command(:subscribe, @subs) if @subs.any? raw_send_command(:psubscribe, @psubs) if @psubs.any? } super end |
#ping ⇒ Object
Pubsub connections to not support even the PING command, but it is useful, especially with read-only connections like pubsub, to be able to check that the TCP connection is still usefully alive.
This is not particularly elegant, but it’s probably the best we can do for now. Ping support for pubsub connections is being considerred: github.com/antirez/redis/issues/420
139 140 141 142 143 |
# File 'lib/em-hiredis/pubsub_client.rb', line 139 def ping subscribe(PING_CHANNEL).callback { unsubscribe(PING_CHANNEL) } end |
#psubscribe(pattern, proc = nil, &block) ⇒ Deferrable
Pattern subscribe to a pubsub channel
If an optional proc / block is provided then it will be called (with the channel name and message) when a message is received on a matching channel
87 88 89 90 91 92 93 94 |
# File 'lib/em-hiredis/pubsub_client.rb', line 87 def psubscribe(pattern, proc = nil, &block) if cb = proc || block @psub_callbacks[pattern] << cb end @psubs << pattern raw_send_command(:psubscribe, [pattern]) return pubsub_deferrable(pattern) end |
#punsubscribe(pattern) ⇒ Deferrable
Pattern unsubscribe all callbacks for a given pattern
100 101 102 103 104 105 |
# File 'lib/em-hiredis/pubsub_client.rb', line 100 def punsubscribe(pattern) @psub_callbacks.delete(pattern) @psubs.delete(pattern) raw_send_command(:punsubscribe, [pattern]) return pubsub_deferrable(pattern) end |
#punsubscribe_proc(pattern, proc) ⇒ Deferrable
Unsubscribe a given callback from a pattern. Will unsubscribe from redis if there are no remaining subscriptions on this pattern
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/em-hiredis/pubsub_client.rb', line 115 def punsubscribe_proc(pattern, proc) df = EM::DefaultDeferrable.new if @psub_callbacks[pattern].delete(proc) if @psub_callbacks[pattern].any? # Succeed deferrable immediately - no need to punsubscribe df.succeed else punsubscribe(pattern).callback { |_| df.succeed } end else df.fail end return df end |
#subscribe(channel, proc = nil, &block) ⇒ Deferrable
Subscribe to a pubsub channel
If an optional proc / block is provided then it will be called when a message is received on this channel
34 35 36 37 38 39 40 41 |
# File 'lib/em-hiredis/pubsub_client.rb', line 34 def subscribe(channel, proc = nil, &block) if cb = proc || block @sub_callbacks[channel] << cb end @subs << channel raw_send_command(:subscribe, [channel]) return pubsub_deferrable(channel) end |
#unsubscribe(channel) ⇒ Deferrable
Unsubscribe all callbacks for a given channel
47 48 49 50 51 52 |
# File 'lib/em-hiredis/pubsub_client.rb', line 47 def unsubscribe(channel) @sub_callbacks.delete(channel) @subs.delete(channel) raw_send_command(:unsubscribe, [channel]) return pubsub_deferrable(channel) end |
#unsubscribe_proc(channel, proc) ⇒ Deferrable
Unsubscribe a given callback from a channel. Will unsubscribe from redis if there are no remaining subscriptions on this channel
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/em-hiredis/pubsub_client.rb', line 62 def unsubscribe_proc(channel, proc) df = EM::DefaultDeferrable.new if @sub_callbacks[channel].delete(proc) if @sub_callbacks[channel].any? # Succeed deferrable immediately - no need to unsubscribe df.succeed else unsubscribe(channel).callback { |_| df.succeed } end else df.fail end return df end |