Class: Qless::Subscriber
- Inherits:
-
Object
- Object
- Qless::Subscriber
- Defined in:
- lib/qless/subscriber.rb
Overview
A class used for subscribing to messages in a thread
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(client, channel, options = {}, &message_received_callback) ⇒ Subscriber
constructor
A new instance of Subscriber.
-
#start ⇒ Object
Start a thread listening.
- #stop ⇒ Object
Constructor Details
#initialize(client, channel, options = {}, &message_received_callback) ⇒ Subscriber
Returns a new instance of Subscriber.
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/qless/subscriber.rb', line 14 def initialize(client, channel, = {}, &) @channel = channel @message_received_callback = @log = .fetch(:log) { ::Logger.new($stderr) } # pub/sub blocks the connection so we must use a different redis # connection @client_redis = client.redis @listener_redis = client.new_redis_connection @my_channel = Qless.generate_jid end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
12 13 14 |
# File 'lib/qless/subscriber.rb', line 12 def channel @channel end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
12 13 14 |
# File 'lib/qless/subscriber.rb', line 12 def redis @redis end |
Class Method Details
.start(*args, &block) ⇒ Object
8 9 10 |
# File 'lib/qless/subscriber.rb', line 8 def self.start(*args, &block) new(*args, &block).tap(&:start) end |
Instance Method Details
#start ⇒ Object
Start a thread listening
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/qless/subscriber.rb', line 28 def start queue = ::Queue.new @thread = Thread.start do @listener_redis.subscribe(@channel, @my_channel) do |on| on.subscribe do |channel| queue.push(:subscribed) if channel == @channel end on. do |channel, | (channel, ) end end end queue.pop end |
#stop ⇒ Object
46 47 48 49 |
# File 'lib/qless/subscriber.rb', line 46 def stop @client_redis.publish(@my_channel, 'disconnect') @thread.join end |