Class: Reqless::Subscriber
- Inherits:
- 
      Object
      
        - Object
- Reqless::Subscriber
 
- Defined in:
- lib/reqless/subscriber.rb
Overview
A class used for subscribing to messages in a thread
Instance Attribute Summary collapse
- 
  
    
      #channel  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute channel. 
- 
  
    
      #redis  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute redis. 
Class Method Summary collapse
Instance Method Summary collapse
- 
  
    
      #initialize(client, channel, options = {}, &message_received_callback)  ⇒ Subscriber 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of Subscriber. 
- 
  
    
      #start  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Start a thread listening. 
- #stop ⇒ Object
Constructor Details
#initialize(client, channel, options = {}, &message_received_callback) ⇒ Subscriber
Returns a new instance of Subscriber.
| 15 16 17 18 19 20 21 22 23 24 25 26 | # File 'lib/reqless/subscriber.rb', line 15 def initialize(client, channel, = {}, &) @channel = channel @message_received_callback = @log = .fetch(:log) { ::Logger.new($stderr) } # pub/sub blocks the connection so we must use a different redis # connection @client_redis = client.redis @listener_redis = client.new_redis_connection @my_channel = Reqless.generate_jid end | 
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
| 13 14 15 | # File 'lib/reqless/subscriber.rb', line 13 def channel @channel end | 
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
| 13 14 15 | # File 'lib/reqless/subscriber.rb', line 13 def redis @redis end | 
Class Method Details
.start(*args, &block) ⇒ Object
| 9 10 11 | # File 'lib/reqless/subscriber.rb', line 9 def self.start(*args, &block) new(*args, &block).tap(&:start) end | 
Instance Method Details
#start ⇒ Object
Start a thread listening
| 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | # File 'lib/reqless/subscriber.rb', line 29 def start queue = ::Queue.new @thread = Thread.start do begin @listener_redis.subscribe(@channel, @my_channel) do |on| on.subscribe do |channel| # insert nil into the queue to indicate we've # successfully subscribed queue << nil if channel == @channel end on. do |channel, | (channel, ) end end # Watch for any exceptions so we don't block forever if # subscribing to the channel fails rescue Exception => e queue << e end end if (exception = queue.pop) raise exception end end | 
#stop ⇒ Object
| 57 58 59 60 | # File 'lib/reqless/subscriber.rb', line 57 def stop @client_redis.publish(@my_channel, 'disconnect') @thread.join end |