Class: EventMachine::Hiredis::PubsubClient

Inherits:
BaseClient
  • Object
show all
Defined in:
lib/em-hiredis/pubsub_client.rb

Constant Summary collapse

PUBSUB_MESSAGES =
%w{message pmessage subscribe unsubscribe psubscribe punsubscribe}.freeze
PING_CHANNEL =
'__em-hiredis-ping'

Instance Attribute Summary

Attributes inherited from BaseClient

#db, #host, #password, #port

Instance Method Summary collapse

Methods inherited from BaseClient

#auth, #close_connection, #configure, #configure_inactivity_check, #connected?, #pending_commands?, #reconnect!, #reconnect_connection, #select

Methods included from EventEmitter

#emit, #listeners, #on, #remove_all_listeners, #remove_listener

Constructor Details

#initialize(host = 'localhost', port = '6379', password = nil, db = nil, tls = false) ⇒ PubsubClient

Returns a new instance of PubsubClient.



7
8
9
10
11
12
# File 'lib/em-hiredis/pubsub_client.rb', line 7

def initialize(host='localhost', port='6379', password=nil, db=nil, tls=false)
  @subs, @psubs = [], []
  @tls = tls
  @pubsub_defs = Hash.new { |h,k| h[k] = [] }
  super
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class EventMachine::Hiredis::BaseClient

Instance Method Details

#connectObject



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/em-hiredis/pubsub_client.rb', line 14

def connect
  @sub_callbacks = Hash.new { |h, k| h[k] = [] }
  @psub_callbacks = Hash.new { |h, k| h[k] = [] }
  
  # Resubsubscribe to channels on reconnect
  on(:reconnected) {
    raw_send_command(:subscribe, @subs) if @subs.any?
    raw_send_command(:psubscribe, @psubs) if @psubs.any?
  }
  
  super
end

#pingObject

Pubsub connections to not support even the PING command, but it is useful, especially with read-only connections like pubsub, to be able to check that the TCP connection is still usefully alive.

This is not particularly elegant, but it’s probably the best we can do for now. Ping support for pubsub connections is being considerred: github.com/antirez/redis/issues/420



139
140
141
142
143
# File 'lib/em-hiredis/pubsub_client.rb', line 139

def ping
  subscribe(PING_CHANNEL).callback {
    unsubscribe(PING_CHANNEL)
  }
end

#psubscribe(pattern, proc = nil, &block) ⇒ Deferrable

Pattern subscribe to a pubsub channel

If an optional proc / block is provided then it will be called (with the channel name and message) when a message is received on a matching channel

Returns:

  • (Deferrable)

    Redis psubscribe call



87
88
89
90
91
92
93
94
# File 'lib/em-hiredis/pubsub_client.rb', line 87

def psubscribe(pattern, proc = nil, &block)
  if cb = proc || block
    @psub_callbacks[pattern] << cb
  end
  @psubs << pattern
  raw_send_command(:psubscribe, [pattern])
  return pubsub_deferrable(pattern)
end

#punsubscribe(pattern) ⇒ Deferrable

Pattern unsubscribe all callbacks for a given pattern

Returns:

  • (Deferrable)

    Redis punsubscribe call



100
101
102
103
104
105
# File 'lib/em-hiredis/pubsub_client.rb', line 100

def punsubscribe(pattern)
  @psub_callbacks.delete(pattern)
  @psubs.delete(pattern)
  raw_send_command(:punsubscribe, [pattern])
  return pubsub_deferrable(pattern)
end

#punsubscribe_proc(pattern, proc) ⇒ Deferrable

Unsubscribe a given callback from a pattern. Will unsubscribe from redis if there are no remaining subscriptions on this pattern

Returns:

  • (Deferrable)

    Succeeds when the punsubscribe has completed or fails if callback could not be found. Note that success may happen immediately in the case that there are other callbacks for the same pattern (and therefore no punsubscription from redis is necessary)



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/em-hiredis/pubsub_client.rb', line 115

def punsubscribe_proc(pattern, proc)
  df = EM::DefaultDeferrable.new
  if @psub_callbacks[pattern].delete(proc)
    if @psub_callbacks[pattern].any?
      # Succeed deferrable immediately - no need to punsubscribe
      df.succeed
    else
      punsubscribe(pattern).callback { |_|
        df.succeed
      }
    end
  else
    df.fail
  end
  return df
end

#subscribe(channel, proc = nil, &block) ⇒ Deferrable

Subscribe to a pubsub channel

If an optional proc / block is provided then it will be called when a message is received on this channel

Returns:

  • (Deferrable)

    Redis subscribe call



34
35
36
37
38
39
40
41
# File 'lib/em-hiredis/pubsub_client.rb', line 34

def subscribe(channel, proc = nil, &block)
  if cb = proc || block
    @sub_callbacks[channel] << cb
  end
  @subs << channel
  raw_send_command(:subscribe, [channel])
  return pubsub_deferrable(channel)
end

#unsubscribe(channel) ⇒ Deferrable

Unsubscribe all callbacks for a given channel

Returns:

  • (Deferrable)

    Redis unsubscribe call



47
48
49
50
51
52
# File 'lib/em-hiredis/pubsub_client.rb', line 47

def unsubscribe(channel)
  @sub_callbacks.delete(channel)
  @subs.delete(channel)
  raw_send_command(:unsubscribe, [channel])
  return pubsub_deferrable(channel)
end

#unsubscribe_proc(channel, proc) ⇒ Deferrable

Unsubscribe a given callback from a channel. Will unsubscribe from redis if there are no remaining subscriptions on this channel

Returns:

  • (Deferrable)

    Succeeds when the unsubscribe has completed or fails if callback could not be found. Note that success may happen immediately in the case that there are other callbacks for the same channel (and therefore no unsubscription from redis is necessary)



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/em-hiredis/pubsub_client.rb', line 62

def unsubscribe_proc(channel, proc)
  df = EM::DefaultDeferrable.new
  if @sub_callbacks[channel].delete(proc)
    if @sub_callbacks[channel].any?
      # Succeed deferrable immediately - no need to unsubscribe
      df.succeed
    else
      unsubscribe(channel).callback { |_|
        df.succeed
      }
    end
  else
    df.fail
  end
  return df
end