Class: Qless::Subscriber
- Inherits:
-
Object
- Object
- Qless::Subscriber
- Defined in:
- lib/qless/subscriber.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(client, channel, &message_received_callback) ⇒ Subscriber
constructor
A new instance of Subscriber.
- #start_pub_sub_listener ⇒ Object
- #wait_until_thread_listening ⇒ Object
Constructor Details
#initialize(client, channel, &message_received_callback) ⇒ Subscriber
Returns a new instance of Subscriber.
12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/qless/subscriber.rb', line 12 def initialize(client, channel, &) @client = client @channel = channel @message_received_callback = # pub/sub blocks the connection so we must use a different redis connection @client_redis = client.redis @listener_redis = client.new_redis_connection @my_channel = Qless.generate_jid end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
10 11 12 |
# File 'lib/qless/subscriber.rb', line 10 def channel @channel end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
10 11 12 |
# File 'lib/qless/subscriber.rb', line 10 def client @client end |
Class Method Details
.start(*args, &block) ⇒ Object
6 7 8 |
# File 'lib/qless/subscriber.rb', line 6 def self.start(*args, &block) new(*args, &block).start_pub_sub_listener end |
Instance Method Details
#start_pub_sub_listener ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/qless/subscriber.rb', line 24 def start_pub_sub_listener @thread = ::Thread.start do @listener_redis.subscribe(channel, @my_channel) do |on| on. do |_channel, | if _channel == @my_channel @listener_redis.unsubscribe(@my_channel) else @message_received_callback.call(self, JSON.parse()) end end end end wait_until_thread_listening end |
#wait_until_thread_listening ⇒ Object
40 41 42 43 44 |
# File 'lib/qless/subscriber.rb', line 40 def wait_until_thread_listening Qless::WaitUntil.wait_until(10) do @client_redis.publish(@my_channel, 'disconnect') == 1 end end |